用Cloudflare Worker搭建Webhook转发服务:无公网IP的完美解决方案
分享一个实用的技术方案——使用Cloudflare Worker搭建Webhook转发服务。这个方案特别适合没有公网IP的开发者,可以轻松实现Webhook的接收和转发。
为什么需要这个方案?
很多第三方服务(比如GitHub、云湖消息推送等)都支持Webhook功能,但开发调试时最大的痛点就是:
- 本地开发环境没有公网IP
- 内网穿透工具不稳定
- 需要实时查看Webhook推送内容
Cloudflare Worker正好能完美解决这些问题,而且完全免费!
五分钟快速搭建
第一步:创建Worker
- 登录Cloudflare后台
- 进入Workers & Pages页面
- 点击"创建应用程序"新建一个Worker
然后把以下代码部署到worker里(可以使用 CRTL+F 搜索
env.DB
将它替换成你的D1绑定的变量):
async function handleWebhook(request, env) {
if (request.method !== "POST") {
return new Response("Method Not Allowed", { status: 405 });
}
try {
const contentType = request.headers.get("content-type");
if (!contentType?.includes("application/json")) {
return new Response(JSON.stringify({ error: "Invalid content type" }), {
status: 400,
headers: { "Content-Type": "application/json" }
});
}
const payload = await request.json();
const eventId = `event_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`;
// 将 payload 转换为纯 JSON 可序列化的对象
const safePayload = JSON.parse(JSON.stringify(payload));
const payloadString = JSON.stringify(safePayload);
let eventType = 'unknown';
if (payload.event && typeof payload.event === 'string') {
eventType = payload.event;
} else if (payload.type) {
eventType = String(payload.type);
}
await env.DB.prepare(
"INSERT INTO events (id, data, timestamp, source_ip, event_type) VALUES (?, ?, ?, ?, ?)"
).bind(
eventId,
payloadString,
Math.floor(Date.now() / 1000),
request.headers.get('CF-Connecting-IP'),
eventType
).run();
return new Response(JSON.stringify({
success: true,
id: eventId,
timestamp: Math.floor(Date.now() / 1000)
}), {
headers: { "Content-Type": "application/json" }
});
} catch (error) {
console.error('Webhook处理错误:', error);
return new Response(JSON.stringify({
success: false,
error: error.message,
timestamp: Math.floor(Date.now() / 1000)
}), {
status: 500,
headers: { "Content-Type": "application/json" }
});
}
}
// SSE处理器
async function handleSSE(request, env) {
if (request.method !== "GET") {
return new Response("Method Not Allowed", { status: 405 });
}
// 生成客户端ID
const clientId = request.headers.get('CF-Connecting-IP') || crypto.randomUUID();
const logId = `log_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`;
const connectTime = Math.floor(Date.now() / 1000);
// 记录连接日志
try {
await env.DB.prepare(
"INSERT INTO sse_logs (id, client_id, ip_address, user_agent, connect_time, status) VALUES (?, ?, ?, ?, ?, ?)"
).bind(
logId,
clientId,
request.headers.get('CF-Connecting-IP'),
request.headers.get('User-Agent'),
connectTime,
'connected'
).run();
} catch (error) {
console.error("SSE连接日志记录失败:", error);
}
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
const encoder = new TextEncoder();
const markEventAsDelivered = async (eventId) => {
try {
// 标记事件为已推送
await env.DB.prepare(
"UPDATE events SET delivered = TRUE WHERE id = ?"
).bind(eventId).run();
// 记录事件推送日志
await env.DB.prepare(
"INSERT INTO event_delivery_logs (id, event_id, client_id, delivery_time, status) VALUES (?, ?, ?, ?, ?)"
).bind(
`delivery_${Math.floor(Date.now()/1000)}_${crypto.randomUUID()}`,
eventId,
clientId,
Math.floor(Date.now() / 1000),
'delivered'
).run();
} catch (error) {
console.error("事件标记和日志记录失败:", error);
}
};
const sendEvent = async (eventType, eventData) => {
const eventPayload = {
event: eventType,
id: eventData.id,
timestamp: Date.now(),
data: typeof eventData.data === 'string' ? JSON.parse(eventData.data) : eventData.data
};
await writer.write(encoder.encode(
`event: ${eventType}\n` +
`id: ${eventData.id}\n` +
`data: ${JSON.stringify(eventPayload)}\n\n`
));
};
// 发送初始历史事件
const sendInitialEvents = async () => {
try {
const { results } = await env.DB.prepare(
"SELECT id, data, timestamp FROM events WHERE delivered = FALSE ORDER BY timestamp ASC LIMIT 50"
).all();
for (const event of results) {
await sendEvent('message', {
id: event.id,
data: event.data,
timestamp: event.timestamp
});
await markEventAsDelivered(event.id);
}
await writer.write(encoder.encode(
`event: system\n` +
`data: ${JSON.stringify({
type: "init",
status: 'ready',
count: results.length,
timestamp: Date.now()
})}\n\n`
));
} catch (error) {
console.error("初始事件发送失败:", error);
}
};
const keepAlive = setInterval(() => {
writer.write(encoder.encode(
`event: system\n` +
`data: ${JSON.stringify({
type: "heartbeat",
timestamp: Date.now()
})}\n\n`
)).catch(() => {
clearInterval(keepAlive);
clearInterval(eventPoller);
});
}, 15000);
// 事件轮询
let lastTimestamp = Math.floor(Date.now() / 1000);
const eventPoller = setInterval(async () => {
try {
const { results } = await env.DB.prepare(
"SELECT id, data, timestamp FROM events WHERE timestamp > ? AND delivered = FALSE ORDER BY timestamp ASC"
).bind(lastTimestamp).all();
if (results.length > 0) {
for (const event of results) {
await sendEvent('message', {
id: event.id,
data: event.data,
timestamp: event.timestamp
});
await markEventAsDelivered(event.id);
lastTimestamp = event.timestamp;
}
}
} catch (error) {
console.error("事件轮询错误:", error);
}
}, 1000);
// 清理函数
const cleanup = async () => {
clearInterval(keepAlive);
clearInterval(eventPoller);
// 记录断开连接日志
try {
await env.DB.prepare(
"UPDATE sse_logs SET disconnect_time = ?, status = 'disconnected' WHERE id = ?"
).bind(
Math.floor(Date.now() / 1000),
logId
).run();
} catch (error) {
console.error("SSE断开连接日志记录失败:", error);
}
writer.close();
};
// 设置中断监听
request.signal.addEventListener("abort", cleanup);
// 发送初始事件
sendInitialEvents();
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
});
}
// 清理旧事件
async function cleanupOldEvents(env) {
try {
const sevenDaysAgo = Math.floor(Date.now() / 1000) - 604800;
// 清理已推送的旧事件
const { meta: eventsMeta } = await env.DB.prepare(
"DELETE FROM events WHERE timestamp < ? AND delivered = TRUE"
).bind(sevenDaysAgo).run();
// 清理旧的SSE连接日志(保留最近30天的)
const thirtyDaysAgo = Math.floor(Date.now() / 1000) - 2592000;
const { meta: logsMeta } = await env.DB.prepare(
"DELETE FROM sse_logs WHERE connect_time < ?"
).bind(thirtyDaysAgo).run();
// 清理旧的事件推送日志
const { meta: deliveryMeta } = await env.DB.prepare(
"DELETE FROM event_delivery_logs WHERE delivery_time < ?"
).bind(thirtyDaysAgo).run();
console.log(`清理完成:
- 删除${eventsMeta.changes}条旧事件
- 删除${logsMeta.changes}条SSE连接日志
- 删除${deliveryMeta.changes}条事件推送日志`);
return {
events: eventsMeta.changes,
logs: logsMeta.changes,
deliveries: deliveryMeta.changes
};
} catch (error) {
console.error('清理错误:', error);
return {
events: 0,
logs: 0,
deliveries: 0,
error: error.message
};
}
}
// SSE日志查询接口
async function handleSSELogs(request, env) {
if (request.method !== "GET") {
return new Response("Method Not Allowed", { status: 405 });
}
try {
const url = new URL(request.url);
const clientId = url.searchParams.get('client_id');
const limit = parseInt(url.searchParams.get('limit')) || 50;
let query = "SELECT * FROM sse_logs ORDER BY connect_time DESC LIMIT ?";
let params = [limit];
if (clientId) {
query = "SELECT * FROM sse_logs WHERE client_id = ? ORDER BY connect_time DESC LIMIT ?";
params = [clientId, limit];
}
const { results } = await env.DB.prepare(query).bind(...params).all();
return new Response(JSON.stringify({
success: true,
logs: results
}), {
headers: { "Content-Type": "application/json" }
});
} catch (error) {
console.error('SSE日志查询错误:', error);
return new Response(JSON.stringify({
success: false,
error: error.message
}), {
status: 500,
headers: { "Content-Type": "application/json" }
});
}
}
export default {
async fetch(request, env) {
const url = new URL(request.url);
switch (url.pathname) {
case "/webhook":
return handleWebhook(request, env);
case "/sse":
return handleSSE(request, env);
case "/sse/logs":
return handleSSELogs(request, env);
default:
return new Response("Not Found", { status: 404 });
}
},
scheduled: async (event, env, ctx) => {
ctx.waitUntil(cleanupOldEvents(env));
}
};
第二步:配置数据库 在D1数据库页面新建一个数据库,然后执行以下SQL创建表:
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
timestamp INTEGER NOT NULL,
source_ip TEXT,
event_type TEXT,
delivered BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS sse_logs (
id TEXT PRIMARY KEY,
client_id TEXT NOT NULL,
ip_address TEXT NOT NULL,
user_agent TEXT,
connect_time INTEGER NOT NULL,
disconnect_time INTEGER,
status TEXT
);
CREATE TABLE IF NOT EXISTS event_delivery_logs (
id TEXT PRIMARY KEY,
event_id TEXT NOT NULL,
client_id TEXT NOT NULL,
delivery_time INTEGER NOT NULL,
status TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_sse_logs_client ON sse_logs(client_id);
CREATE INDEX IF NOT EXISTS idx_delivery_event ON event_delivery_logs(event_id);
第三步:部署代码 把完整代码复制到Worker编辑器中,绑定数据库后点击部署就完成了!
核心功能体验
接收Webhook
- 访问地址:/webhook
- 支持POST请求
- 自动记录所有请求数据
实时事件推送
- 访问地址:/sse
- 支持浏览器直接访问
- 实时显示最新事件
curl -N https://你的worker地址/sse
查看日志
- 访问地址:/sse/logs
- 可查询历史连接记录
- 支持按客户端筛选
事件数据示例
1. 原始 Webhook 转发格式
我们的 Worker 会原样转发收到的 Webhook 数据,保持原始格式不变:
event: message
id: event_1718112345_550e8400
data: {
// 原始Webhook的全部数据...
}
2. 系统事件示例
event: system
id: system_1718112350_550e8401
data: {
"type": "init",
"status": "ready",
"message": "Connected. Last event ID: event_1718112345_550e8400"
}
3. 心跳事件
event: heartbeat
id: hb_1718112450_550e8402
data: {
"timestamp": 1718112450000,
"active_connections": 5
}
实际使用技巧
- 调试时可以先用Postman发送测试请求
- 本地开发时配合ngrok使用效果更佳
- 数据库会自动清理7天前的数据,不用担心存储问题
如果你也经常需要调试Webhook,不妨试试这个方案,相信会大大提升你的开发效率!