
我活着吗...喔,我还活着~!
Popular posts
-
用Cloudflare Worker搭建Webhook转发服务:无公网IP的完美解决方案
分享一个实用的技术方案——使用Cloudflare Worker搭建Webhook转发服务。这个方案特别适合没有公网IP的开发者,可以轻松实现Webhook的接收和转发。
为什么需要这个方案?
很多第三方服务(比如GitHub、云湖消息推送等)都支持Webhook功能,但开发调试时最大的痛点就是:
- 本地开发环境没有公网IP
- 内网穿透工具不稳定
- 需要实时查看Webhook推送内容
Cloudflare Worker正好能完美解决这些问题,而且完全免费!
五分钟快速搭建
第一步:创建Worker
- 登录Cloudflare后台
- 进入Workers & Pages页面
- 点击"创建应用程序"新建一个Worker
然后把以下代码部署到worker里(可以使用 CRTL+F 搜索
env.DB
将它替换成你的D1绑定的变量):
async function handleWebhook(request, env) { if (request.method !== "POST") { return new Response("Method Not Allowed", { status: 405 }); } try { const contentType = request.headers.get("content-type"); if (!contentType?.includes("application/json")) { return new Response(JSON.stringify({ error: "Invalid content type" }), { status: 400, headers: { "Content-Type": "application/json" } }); } const payload = await request.json(); const eventId = `event_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`; // 将 payload 转换为纯 JSON 可序列化的对象 const safePayload = JSON.parse(JSON.stringify(payload)); const payloadString = JSON.stringify(safePayload); let eventType = 'unknown'; if (payload.event && typeof payload.event === 'string') { eventType = payload.event; } else if (payload.type) { eventType = String(payload.type); } await env.DB.prepare( "INSERT INTO events (id, data, timestamp, source_ip, event_type) VALUES (?, ?, ?, ?, ?)" ).bind( eventId, payloadString, Math.floor(Date.now() / 1000), request.headers.get('CF-Connecting-IP'), eventType ).run(); return new Response(JSON.stringify({ success: true, id: eventId, timestamp: Math.floor(Date.now() / 1000) }), { headers: { "Content-Type": "application/json" } }); } catch (error) { console.error('Webhook处理错误:', error); return new Response(JSON.stringify({ success: false, error: error.message, timestamp: Math.floor(Date.now() / 1000) }), { status: 500, headers: { "Content-Type": "application/json" } }); } } // SSE处理器 async function handleSSE(request, env) { if (request.method !== "GET") { return new Response("Method Not Allowed", { status: 405 }); } // 生成客户端ID const clientId = request.headers.get('CF-Connecting-IP') || crypto.randomUUID(); const logId = `log_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`; const connectTime = Math.floor(Date.now() / 1000); // 记录连接日志 try { await env.DB.prepare( "INSERT INTO sse_logs (id, client_id, ip_address, user_agent, connect_time, status) VALUES (?, ?, ?, ?, ?, ?)" ).bind( logId, clientId, request.headers.get('CF-Connecting-IP'), request.headers.get('User-Agent'), connectTime, 'connected' ).run(); } catch (error) { console.error("SSE连接日志记录失败:", error); } const { readable, writable } = new TransformStream(); const writer = writable.getWriter(); const encoder = new TextEncoder(); const markEventAsDelivered = async (eventId) => { try { // 标记事件为已推送 await env.DB.prepare( "UPDATE events SET delivered = TRUE WHERE id = ?" ).bind(eventId).run(); // 记录事件推送日志 await env.DB.prepare( "INSERT INTO event_delivery_logs (id, event_id, client_id, delivery_time, status) VALUES (?, ?, ?, ?, ?)" ).bind( `delivery_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`, eventId, clientId, Math.floor(Date.now() / 1000), 'delivered' ).run(); } catch (error) { console.error("事件标记和日志记录失败:", error); } }; const sendEvent = async (eventType, eventData) => { const eventPayload = { event: eventType, id: eventData.id, timestamp: Date.now(), data: typeof eventData.data === 'string' ? JSON.parse(eventData.data) : eventData.data }; await writer.write(encoder.encode( `event: ${eventType}\n` + `id: ${eventData.id}\n` + `data: ${JSON.stringify(eventPayload)}\n\n` )); }; // 发送初始历史事件 const sendInitialEvents = async () => { try { const { results } = await env.DB.prepare( "SELECT id, data, timestamp FROM events WHERE delivered = FALSE ORDER BY timestamp ASC LIMIT 50" ).all(); for (const event of results) { await sendEvent('message', { id: event.id, data: event.data, timestamp: event.timestamp }); await markEventAsDelivered(event.id); } await writer.write(encoder.encode( `event: system\n` + `data: ${JSON.stringify({ type: "init", status: 'ready', count: results.length, timestamp: Date.now() })}\n\n` )); } catch (error) { console.error("初始事件发送失败:", error); } }; const keepAlive = setInterval(() => { writer.write(encoder.encode( `event: system\n` + `data: ${JSON.stringify({ type: "heartbeat", timestamp: Date.now() })}\n\n` )).catch(() => { clearInterval(keepAlive); clearInterval(eventPoller); }); }, 15000); // 事件轮询 let lastTimestamp = Math.floor(Date.now() / 1000); const eventPoller = setInterval(async () => { try { const { results } = await env.DB.prepare( "SELECT id, data, timestamp FROM events WHERE timestamp > ? AND delivered = FALSE ORDER BY timestamp ASC" ).bind(lastTimestamp).all(); if (results.length > 0) { for (const event of results) { await sendEvent('message', { id: event.id, data: event.data, timestamp: event.timestamp }); await markEventAsDelivered(event.id); lastTimestamp = event.timestamp; } } } catch (error) { console.error("事件轮询错误:", error); } }, 1000); // 清理函数 const cleanup = async () => { clearInterval(keepAlive); clearInterval(eventPoller); // 记录断开连接日志 try { await env.DB.prepare( "UPDATE sse_logs SET disconnect_time = ?, status = 'disconnected' WHERE id = ?" ).bind( Math.floor(Date.now() / 1000), logId ).run(); } catch (error) { console.error("SSE断开连接日志记录失败:", error); } writer.close(); }; // 设置中断监听 request.signal.addEventListener("abort", cleanup); // 发送初始事件 sendInitialEvents(); return new Response(readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*" } }); } // 清理旧事件 async function cleanupOldEvents(env) { try { const sevenDaysAgo = Math.floor(Date.now() / 1000) - 604800; // 清理已推送的旧事件 const { meta: eventsMeta } = await env.DB.prepare( "DELETE FROM events WHERE timestamp < ? AND delivered = TRUE" ).bind(sevenDaysAgo).run(); // 清理旧的SSE连接日志(保留最近30天的) const thirtyDaysAgo = Math.floor(Date.now() / 1000) - 2592000; const { meta: logsMeta } = await env.DB.prepare( "DELETE FROM sse_logs WHERE connect_time < ?" ).bind(thirtyDaysAgo).run(); // 清理旧的事件推送日志 const { meta: deliveryMeta } = await env.DB.prepare( "DELETE FROM event_delivery_logs WHERE delivery_time < ?" ).bind(thirtyDaysAgo).run(); console.log(`清理完成: - 删除${eventsMeta.changes}条旧事件 - 删除${logsMeta.changes}条SSE连接日志 - 删除${deliveryMeta.changes}条事件推送日志`); return { events: eventsMeta.changes, logs: logsMeta.changes, deliveries: deliveryMeta.changes }; } catch (error) { console.error('清理错误:', error); return { events: 0, logs: 0, deliveries: 0, error: error.message }; } } // SSE日志查询接口 async function handleSSELogs(request, env) { if (request.method !== "GET") { return new Response("Method Not Allowed", { status: 405 }); } try { const url = new URL(request.url); const clientId = url.searchParams.get('client_id'); const limit = parseInt(url.searchParams.get('limit')) || 50; let query = "SELECT * FROM sse_logs ORDER BY connect_time DESC LIMIT ?"; let params = [limit]; if (clientId) { query = "SELECT * FROM sse_logs WHERE client_id = ? ORDER BY connect_time DESC LIMIT ?"; params = [clientId, limit]; } const { results } = await env.DB.prepare(query).bind(...params).all(); return new Response(JSON.stringify({ success: true, logs: results }), { headers: { "Content-Type": "application/json" } }); } catch (error) { console.error('SSE日志查询错误:', error); return new Response(JSON.stringify({ success: false, error: error.message }), { status: 500, headers: { "Content-Type": "application/json" } }); } } export default { async fetch(request, env) { const url = new URL(request.url); switch (url.pathname) { case "/webhook": return handleWebhook(request, env); case "/sse": return handleSSE(request, env); case "/sse/logs": return handleSSELogs(request, env); default: return new Response("Not Found", { status: 404 }); } }, scheduled: async (event, env, ctx) => { ctx.waitUntil(cleanupOldEvents(env)); } };
第二步:配置数据库 在D1数据库页面新建一个数据库,然后执行以下SQL创建表:
-
Less