YingXinche / 用Cloudflare Worker搭建Webhook转发服务:无公网IP的完美解决方案

Created Wed, 11 Jun 2025 11:24:23 +0800 Modified Sat, 21 Jun 2025 00:06:35 +0800
2318 Words

用Cloudflare Worker搭建Webhook转发服务:无公网IP的完美解决方案

分享一个实用的技术方案——使用Cloudflare Worker搭建Webhook转发服务。这个方案特别适合没有公网IP的开发者,可以轻松实现Webhook的接收和转发。

为什么需要这个方案?

很多第三方服务(比如GitHub、云湖消息推送等)都支持Webhook功能,但开发调试时最大的痛点就是:

  1. 本地开发环境没有公网IP
  2. 内网穿透工具不稳定
  3. 需要实时查看Webhook推送内容

Cloudflare Worker正好能完美解决这些问题,而且完全免费!

五分钟快速搭建

第一步:创建Worker

  1. 登录Cloudflare后台
  2. 进入Workers & Pages页面
  3. 点击"创建应用程序"新建一个Worker 然后把以下代码部署到worker里(可以使用 CRTL+F 搜索 env.DB 将它替换成你的D1绑定的变量):
async function handleWebhook(request, env) {
    if (request.method !== "POST") {
        return new Response("Method Not Allowed", { status: 405 });
    }

    try {
        const contentType = request.headers.get("content-type");
        if (!contentType?.includes("application/json")) {
            return new Response(JSON.stringify({ error: "Invalid content type" }), {
                status: 400,
                headers: { "Content-Type": "application/json" }
            });
        }

        const payload = await request.json();
        const eventId = `event_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`;
        
        // 将 payload 转换为纯 JSON 可序列化的对象
        const safePayload = JSON.parse(JSON.stringify(payload));
        const payloadString = JSON.stringify(safePayload);

        let eventType = 'unknown';
        if (payload.event && typeof payload.event === 'string') {
            eventType = payload.event;
        } else if (payload.type) {
            eventType = String(payload.type);
        }

        await env.DB.prepare(
            "INSERT INTO events (id, data, timestamp, source_ip, event_type) VALUES (?, ?, ?, ?, ?)"
        ).bind(
            eventId,
            payloadString,
            Math.floor(Date.now() / 1000),
            request.headers.get('CF-Connecting-IP'),
            eventType
        ).run();

        return new Response(JSON.stringify({ 
            success: true,
            id: eventId,
            timestamp: Math.floor(Date.now() / 1000)
        }), { 
            headers: { "Content-Type": "application/json" } 
        });

    } catch (error) {
        console.error('Webhook处理错误:', error);
        return new Response(JSON.stringify({ 
            success: false,
            error: error.message,
            timestamp: Math.floor(Date.now() / 1000)
        }), { 
            status: 500,
            headers: { "Content-Type": "application/json" } 
        });
    }
}

// SSE处理器
async function handleSSE(request, env) {
    if (request.method !== "GET") {
        return new Response("Method Not Allowed", { status: 405 });
    }

    // 生成客户端ID
    const clientId = request.headers.get('CF-Connecting-IP') || crypto.randomUUID();
    const logId = `log_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`;
    const connectTime = Math.floor(Date.now() / 1000);

    // 记录连接日志
    try {
        await env.DB.prepare(
            "INSERT INTO sse_logs (id, client_id, ip_address, user_agent, connect_time, status) VALUES (?, ?, ?, ?, ?, ?)"
        ).bind(
            logId,
            clientId,
            request.headers.get('CF-Connecting-IP'),
            request.headers.get('User-Agent'),
            connectTime,
            'connected'
        ).run();
    } catch (error) {
        console.error("SSE连接日志记录失败:", error);
    }

    const { readable, writable } = new TransformStream();
    const writer = writable.getWriter();
    const encoder = new TextEncoder();

    const markEventAsDelivered = async (eventId) => {
        try {
            // 标记事件为已推送
            await env.DB.prepare(
                "UPDATE events SET delivered = TRUE WHERE id = ?"
            ).bind(eventId).run();

            // 记录事件推送日志
            await env.DB.prepare(
                "INSERT INTO event_delivery_logs (id, event_id, client_id, delivery_time, status) VALUES (?, ?, ?, ?, ?)"
            ).bind(
                `delivery_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`,
                eventId,
                clientId,
                Math.floor(Date.now() / 1000),
                'delivered'
            ).run();
        } catch (error) {
            console.error("事件标记和日志记录失败:", error);
        }
    };

    const sendEvent = async (eventType, eventData) => {
        const eventPayload = {
            event: eventType,
            id: eventData.id,
            timestamp: Date.now(),
            data: typeof eventData.data === 'string' ? JSON.parse(eventData.data) : eventData.data
        };
        
        await writer.write(encoder.encode(
            `event: ${eventType}\n` +
            `id: ${eventData.id}\n` +
            `data: ${JSON.stringify(eventPayload)}\n\n`
        ));
    };

    // 发送初始历史事件
    const sendInitialEvents = async () => {
        try {
            const { results } = await env.DB.prepare(
                "SELECT id, data, timestamp FROM events WHERE delivered = FALSE ORDER BY timestamp ASC LIMIT 50"
            ).all();
    
            for (const event of results) {
                await sendEvent('message', {
                    id: event.id,
                    data: event.data,
                    timestamp: event.timestamp
                });
                await markEventAsDelivered(event.id);
            }
    
            await writer.write(encoder.encode(
                `event: system\n` +
                `data: ${JSON.stringify({
                    type: "init",
                    status: 'ready',
                    count: results.length,
                    timestamp: Date.now()
                })}\n\n`
            ));
        } catch (error) {
            console.error("初始事件发送失败:", error);
        }
    };

    const keepAlive = setInterval(() => {
        writer.write(encoder.encode(
            `event: system\n` +
            `data: ${JSON.stringify({
                type: "heartbeat",
                timestamp: Date.now()
            })}\n\n`
        )).catch(() => {
            clearInterval(keepAlive);
            clearInterval(eventPoller);
        });
    }, 15000);

    // 事件轮询
    let lastTimestamp = Math.floor(Date.now() / 1000);
    const eventPoller = setInterval(async () => {
        try {
            const { results } = await env.DB.prepare(
                "SELECT id, data, timestamp FROM events WHERE timestamp > ? AND delivered = FALSE ORDER BY timestamp ASC"
            ).bind(lastTimestamp).all();
    
            if (results.length > 0) {
                for (const event of results) {
                    await sendEvent('message', {
                        id: event.id,
                        data: event.data,
                        timestamp: event.timestamp
                    });
                    await markEventAsDelivered(event.id);
                    lastTimestamp = event.timestamp;
                }
            }
        } catch (error) {
            console.error("事件轮询错误:", error);
        }
    }, 1000);

    // 清理函数
    const cleanup = async () => {
        clearInterval(keepAlive);
        clearInterval(eventPoller);
        
        // 记录断开连接日志
        try {
            await env.DB.prepare(
                "UPDATE sse_logs SET disconnect_time = ?, status = 'disconnected' WHERE id = ?"
            ).bind(
                Math.floor(Date.now() / 1000),
                logId
            ).run();
        } catch (error) {
            console.error("SSE断开连接日志记录失败:", error);
        }
        
        writer.close();
    };

    // 设置中断监听
    request.signal.addEventListener("abort", cleanup);

    // 发送初始事件
    sendInitialEvents();

    return new Response(readable, {
        headers: {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": "*"
        }
    });
}

// 清理旧事件
async function cleanupOldEvents(env) {
    try {
        const sevenDaysAgo = Math.floor(Date.now() / 1000) - 604800;
        
        // 清理已推送的旧事件
        const { meta: eventsMeta } = await env.DB.prepare(
            "DELETE FROM events WHERE timestamp < ? AND delivered = TRUE"
        ).bind(sevenDaysAgo).run();
        
        // 清理旧的SSE连接日志(保留最近30天的)
        const thirtyDaysAgo = Math.floor(Date.now() / 1000) - 2592000;
        const { meta: logsMeta } = await env.DB.prepare(
            "DELETE FROM sse_logs WHERE connect_time < ?"
        ).bind(thirtyDaysAgo).run();
        
        // 清理旧的事件推送日志
        const { meta: deliveryMeta } = await env.DB.prepare(
            "DELETE FROM event_delivery_logs WHERE delivery_time < ?"
        ).bind(thirtyDaysAgo).run();
        
        console.log(`清理完成: 
            - 删除${eventsMeta.changes}条旧事件
            - 删除${logsMeta.changes}条SSE连接日志
            - 删除${deliveryMeta.changes}条事件推送日志`);
            
        return {
            events: eventsMeta.changes,
            logs: logsMeta.changes,
            deliveries: deliveryMeta.changes
        };
    } catch (error) {
        console.error('清理错误:', error);
        return {
            events: 0,
            logs: 0,
            deliveries: 0,
            error: error.message
        };
    }
}

// SSE日志查询接口
async function handleSSELogs(request, env) {
    if (request.method !== "GET") {
        return new Response("Method Not Allowed", { status: 405 });
    }

    try {
        const url = new URL(request.url);
        const clientId = url.searchParams.get('client_id');
        const limit = parseInt(url.searchParams.get('limit')) || 50;
        
        let query = "SELECT * FROM sse_logs ORDER BY connect_time DESC LIMIT ?";
        let params = [limit];
        
        if (clientId) {
            query = "SELECT * FROM sse_logs WHERE client_id = ? ORDER BY connect_time DESC LIMIT ?";
            params = [clientId, limit];
        }
        
        const { results } = await env.DB.prepare(query).bind(...params).all();
        
        return new Response(JSON.stringify({ 
            success: true,
            logs: results
        }), { 
            headers: { "Content-Type": "application/json" } 
        });
    } catch (error) {
        console.error('SSE日志查询错误:', error);
        return new Response(JSON.stringify({ 
            success: false,
            error: error.message
        }), { 
            status: 500,
            headers: { "Content-Type": "application/json" } 
        });
    }
}

export default {
    async fetch(request, env) {
        const url = new URL(request.url);
        switch (url.pathname) {
            case "/webhook":
                return handleWebhook(request, env);
            case "/sse":
                return handleSSE(request, env);
            case "/sse/logs":
                return handleSSELogs(request, env);
            default:
                return new Response("Not Found", { status: 404 });
        }
    },

    scheduled: async (event, env, ctx) => {
        ctx.waitUntil(cleanupOldEvents(env));
    }
};

第二步:配置数据库 在D1数据库页面新建一个数据库,然后执行以下SQL创建表:

CREATE TABLE IF NOT EXISTS events (
  id TEXT PRIMARY KEY,
  data TEXT NOT NULL,
  timestamp INTEGER NOT NULL,
  source_ip TEXT,
  event_type TEXT,
  delivered BOOLEAN DEFAULT FALSE
);

CREATE TABLE IF NOT EXISTS sse_logs (
  id TEXT PRIMARY KEY,
  client_id TEXT NOT NULL,
  ip_address TEXT NOT NULL,
  user_agent TEXT,
  connect_time INTEGER NOT NULL,
  disconnect_time INTEGER,
  status TEXT
);

CREATE TABLE IF NOT EXISTS event_delivery_logs (
  id TEXT PRIMARY KEY,
  event_id TEXT NOT NULL,
  client_id TEXT NOT NULL,
  delivery_time INTEGER NOT NULL,
  status TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_sse_logs_client ON sse_logs(client_id);
CREATE INDEX IF NOT EXISTS idx_delivery_event ON event_delivery_logs(event_id);

第三步:部署代码 把完整代码复制到Worker编辑器中,绑定数据库后点击部署就完成了!

核心功能体验

接收Webhook

  • 访问地址:/webhook
  • 支持POST请求
  • 自动记录所有请求数据

实时事件推送

  • 访问地址:/sse
  • 支持浏览器直接访问
  • 实时显示最新事件
curl -N https://你的worker地址/sse

查看日志

  • 访问地址:/sse/logs
  • 可查询历史连接记录
  • 支持按客户端筛选

事件数据示例

1. 原始 Webhook 转发格式

我们的 Worker 会原样转发收到的 Webhook 数据,保持原始格式不变:

event: message
id: event_1718112345_550e8400
data: {
  // 原始Webhook的全部数据...
}

2. 系统事件示例

event: system
id: system_1718112350_550e8401
data: {
  "type": "init",
  "status": "ready",
  "message": "Connected. Last event ID: event_1718112345_550e8400"
}

3. 心跳事件

event: heartbeat
id: hb_1718112450_550e8402
data: {
  "timestamp": 1718112450000,
  "active_connections": 5
}

实际使用技巧

  1. 调试时可以先用Postman发送测试请求
  2. 本地开发时配合ngrok使用效果更佳
  3. 数据库会自动清理7天前的数据,不用担心存储问题

如果你也经常需要调试Webhook,不妨试试这个方案,相信会大大提升你的开发效率!


公安备案图标 冀公网安备13063002000071号 冀ICP备2024094988号