Skip to content

🎯 标准 DDD 架构(活动取消 Saga 完整流程)

让我以活动取消退款为例,完整梳理:

1️⃣ 正常流程(用户主动取消)

用户点击取消

Controller (Trigger Trigger↓ 调用
ActivityApplicationService.cancelActivity(activityId, userId)
    ↓ 执行
ActivityCancellationSaga.execute(command)
    ├─ Step 1: 取消活动(更新状态)
    │   └─ sagaStateRepository.update(state) ✅ 记录到数据库
    ├─ Step 2: 创建退款记录
    │   ├─   ├─isStepCompleted("CREATE_REFUND_RECORDS") ?
    │   ├─ 标记: markStepStarting("CREATE_REFUND_RECORDS")
    │   ├─ 持久化: sagaStateRepository.update(state) ✅

    │用: paymentPort.createRefundRecords()

    │ markStepCompleted("CREATE_REFUND_RECORDS")
    │   └─ 持久化: sagaStateRepository.update(state) ✅
    ├─ Step 3: 执行批量退款
    │   ├─ 检查: isStepCompleted("PROCESS_REFUNDS") ?
    │   ├─ 标记: markStepStarting("PROCESS_REFUNDS")
    │   REFUNDS")
    │teRepository.update(state) ✅
    │   ├─ 远程调用: paymentPort.batchRefund()
    │   ├─ 标记: markStepCompleted("PROCESS_REFUNDS")
    │   └─ 持久化: sagaStateRepository.update(state) ✅
    └─ Step 4: 发送通知(异步)
        └─ domainEventPublisher.publish(ActivityCancelledEvent)

2️⃣ 异常恢复流程(网络中断后)

定时任务触发(每 1 分钟)
    SagaRecoveryTask (Trigger 层)
    ↓ 调用 

SagaRecoveryTask (Trigger 层)
    ↓ 调用ecoverPendingSagas()
    ↓ 实现
SagaRecoveryApplicationService (App 层)
    ├─ 查询: sagaStateRepository.findPendingSagas()
    │   └─ 返回: 所有非终态的 Saga (STARTED, PROCESSING, COMPENSATING)

    └─ 逐个恢复:
        ├─ 加载 Saga 实例: ActivityCancellationSaga
        ancellationSagaetState(sagaState)
        └─ 重新执行: saga.execute(command): 已完成 完成跳过
            ├─ Step 2: 检查 isStepCompleted("CREATE_REFUND_RECORDS")
            │   ├─ 如果已完成: 调用 paymentPort.findRefundRecordsByIdempotencyKey(sagaId)
            Key(sagaId)成: 重新执行(幂等)
            └─ Step 3: 检查 isStepCompleted("PROCESS_REFUNDS")
                ├─ 如果已完成: 已完成:               └─ 如果未完成: 重新执行(幂等)

3️⃣ 补偿流程(Saga 失败)

Saga 执行失败

saga.compensate()
    ├─ Step 3 补偿: 撤销退款(如果已退款)
    ├─ Step 2 补偿: 删除退款记录
    └─ Step 1 补偿: 偿:活动状态

✅ 标准 DDD 六边形架构

完整依赖关系

┌─────────────────────────────────────────────────────────┐
│ Trigger                                 
│ - ActivityController (用户请求) 
│ - SagaRecoveryTask (定时任务)                           
│ - RefundRetryTask (定时任务)                            
└────────────────┬────────────────────────────────────────┘
               
用 Input Port────────────────────────────────────────────────┐
│ Domain 层 (Input Port - 业务契约)                       
│ - IActivityApplicationService                    
│ - ISagaRecoveryApplicationService                
│ - SagaRecoveryApplicationServiceicationService                        
└────────────────┬────────────────────────────────────────┘
                 │ 实现

┌─────────────────────────────────────────────────────────┐
│ App                                      
│ - ActivityApplicationService                           
│ - ApplicationServiceApplicationService  
│ - RecoveryApplicationService
│ - RefundRetryApplicationService                         │
└────────────────┬────────────────────────────────────────┘
                 │ 调用

┌─────────────────────────────────────────────────────────┐
│ Domain 层 (核心业务逻辑)
│ - ActivityCancellationSaga (取消活动)
│ - ActivityRegistrationSaga (报名活动              
│ - ActivityRegistrationSaga (报名活动类)
│ - SagaState (Saga 状态)                                 
└────────────────┬────────────────────────────────────────┘
                 │ 调用 Output Port

┌─────────────────────────────────────────────────────────┐
│ Domain 层 (Output Port -             │
│ - ISagaStateRepository (Saga 状态仓储接口)              
│ - IActivityAggregateRepository (活动仓储接口)           
│ - SagaStateRepository (Saga 状态仓储接口)              

┌─────────────────────────────────────────────────────────┐
│ Infrastructure 层 (技术实现)- PaymentAdapter (微信支付适配器)                       
│ - SagaStateRepository (Saga 状态仓储实现)               
│ - ActivityAggregateRepository (活动仓储实现)            
└─────────────────────────────────────────────────────────┘

问题 2:职责清晰度

类名职责层次
SagaRecoveryTask定时触发 Saga 恢复Trigger 层
SagaRecoveryApplicationService编排 Saga 恢复逻辑App 层
ActivityCancellationSaga活动取消业务流程Domain 层
SagaStateRepositorySaga 状态持久化Infrastructure 层

📝 总结:活动取消 Saga 的完整链路

用户视角

用户点击"取消活动" 
  → 后台执行 行aga(4 个步骤)
  → 每个步骤都记录到数据库
  → 如果中断,定时任务自动恢复
  → 最终退款成功,用户收到通知

技术视角

1. 幂等设计:每个步骤都可重复执行
2. 状态机:记录已完成的步骤,避免重复
3. 持久化:每个关键步骤都保存到数据库
4. 自动恢复:定时任务扫描未完成的 Saga
5. 补偿机制:失败时逆向回滚

为什么需要这么复杂?

场景 1:网络中断

  • Step 2 执行到一半,微信支付接口调用成功

  • 但 sagaStateRepository.update() 失败(网络断开)

  • 定时任务恢复时,通过 isStepCompleted() 检查

  • 调用 paymentPort.findRefundRecordsByIdempotencyKey() 查询结果

  • 跳过 Step 2,继续执行 Step 3 ✅

场景 2:服务重启

  • Saga 执行到 Step 3,服务突然重启

  • 重启后,定时任务扫描数据库

  • 发现未完成的 Saga,重新加载

  • 从 Step 3 继续执行 ✅

场景 3:微信支付失败

  • Step 3 退款失败(余额不足)

  • Saga 标记为 FAILED

  • RefundRetryTask 定时重试

  • 或者触发补偿流程,恢复活动状态 ✅


Powered by VitePress