Skip to main content

Durable Function

// 订单流程 / 编排
// 本身是确定性的
// 实际这个流程可能运行 几天
export async function orderProcessingWorkflow(order: OrderDetails): Promise<string> {
let transactionId: string | null = null;

try {
// 检查并预留库存
await workflow.callActivity(reserveInventory, order.items);

// 支付处理
// 幂等操作,自动重试
const paymentResponse = await workflow.callActivityWithRetry(
processPayment,
{
initialInterval: '10s',
maximumAttempts: 5,
},
order.orderId,
order.amount,
);

transactionId = paymentResponse.transactionId;

// timer,人工交互
if (paymentResponse.status === 'REQUIRES_REVIEW') {
// side effect
await workflow.callActivity(notifyFraudTeam, order.orderId);

const approvalEvent = workflow.waitForExternalEvent<boolean>('fraudReviewApproved');
const timeout = workflow.createTimer('2h');

// 等待其中一个完成
const winner = await Promise.race([approvalEvent, timeout]);

if (winner === timeout) {
// 如果超时,则认为审核失败,抛出异常以触发补偿逻辑。
throw new Error('欺诈审核超时');
}

const isApproved = await approvalEvent;
if (!isApproved) {
// 如果审核被拒绝,也抛出异常。
throw new Error('欺诈审核被拒绝');
}
}

// side effect
// 从库存中扣除商品、发送订单确认邮件
await workflow.callActivity(deductInventory, order.items);
await workflow.callActivity(sendOrderConfirmationEmail, order.customerId, order.orderId);

// timer
// 等到 24 小时后发送跟进邮件
await workflow.sleep('24h');
await workflow.callActivity(sendFollowUpEmail, order.customerId, order.orderId);

return `订单 ${order.orderId} 处理成功。`;
} catch (error) {
// 补偿
if (transactionId) {
await workflow.callActivity(refundPayment, transactionId);
}
console.error(`订单 ${order.orderId} 处理失败:`, error);
return `订单 ${order.orderId} 处理失败。`;
}
}

原理

  • 事件溯源 (Event Sourcing)
    • 以 AOL/Append Only Log 的方式记录所有操作
  • 检查点 (Checkpointing)
    • 等待进入检查点,记录当前状态
    • 状态记录后,实际的逻辑可以 offload
    • 例如 await sleep('5m') 本质是 throw 一个 Error, 外部检测然后 checkpoint
  • 重放 (Replay)
    • 通过事件日志重放状态
    • 恢复 逻辑状态,确保与原始执行路径一致
  • 确定性的 (deterministic)
    • Workflow 主体是无 side effect 的
    • side effect 抽离到 activity
    • 保障 replay 能进入同样的状态

场景

  • 任务队列 (Task Queues)
  • Saga 模式
  • Cron 作业与调度器
  • 事件驱动架构 (Event-Driven Architecture, EDA)

应用模式

  • 函数链接 (Function Chaining) - 用于顺序流程
  • 扇出/扇入 (Fan-out/Fan-in) - 用于并行处理
  • 异步 HTTP API - 用于长时操作
  • 监控 (Monitoring) - 用于状态驱动的轮询
  • 人工交互 (Human Interaction) - 用于审批工作流
  • 聚合器 (Aggregator) - 用于状态化数据收集

Awesome

Cadence

  • Workflow
    • 确定性
  • Activity
  • Task List
  • Signal
    • 异步消息
  • Synchronous Query
    • 直接接收同步查询
  • Archival
    • 完成的 workflow 归档到 s3,减少主数据库压力

Temporal

  • fork by Cadence 的两位核心创始人 Maxim Fateev 、 Samar Abbas

Azure Durable Functions

  • Orchestrator Functions / Workflow
  • Activity Functions / Side effect
  • Entity Functions
    • Durable Functions 2.0 引入
    • 对 Actor 进行建模