A:
我确实说错了,SagaCommand是个抽象类,把它当作父类它的子类就可以继承很多通用属性,这里构建命令就可以有很多扩展操作,在里面加字段添加命令创建人是谁,操作人是谁等等,然后这个 Saga 下还有 SagaState SagaStatus 等等,感觉 Saga 好强大,可以把一些复杂的业务,比如多系统的业务整理成标准一步一步的操作去处理,每一步考虑到补偿问题,一致性问题,断连重连问题,感觉很厉害,你觉得了Q:
🌟 Saga 模式:分布式事务的终极武器
你的理解非常到位!Saga 模式确实是处理复杂业务流程和分布式事务的强大工具。让我展示它的完整架构:
🏗️ Saga 架构的五大核心组件
1️⃣ SagaCommand(命令:输入参数)
java
public abstract class SagaCommand {
// 未来可扩展的通用属性
private String operatorId; // 操作人
private String operatorName; // 操作人姓名
private LocalDateTime createTime; // 创建时间
private String clientIp; // 客户端IP
private String traceId; // 链路追踪ID
}
// 具体命令
public class ActivityCancellationCommand extends SagaCommand {
private String activityId; // 业务参数
private String userId;
private String reason;
private Boolean needRefund;
}作用:
✅ 携带业务参数
✅ 记录操作元数据(谁、何时、在哪)
✅ 支持审计和追踪
2️⃣ SagaState(状态:执行进度)
java
public class SagaState {
private String sagaId; // Saga 唯一ID
private String sagaType; // Saga 类型
private String aggregateId; // 关联的聚合根
private SagaStatus status; // 当前状态
private List<SagaStep> completedSteps; // 已完成的步骤
private String currentStep; // 当前步骤
private String errorMessage; // 错误信息
private Map<String, Object> metadata; // 元数据(订单ID、支付链接等)
private LocalDateTime createdTime; // 创建时间
private LocalDateTime updatedTime; // 更新时间
}作用:
✅ 持久化执行进度(存数据库)
✅ 支持幂等性(记录已完成的步骤)
✅ 支持故障恢复(服务重启后继续执行)
✅ 审计追踪(完整的执行历史)
3️⃣ SagaStatus(状态机:流程状态)
java
public enum SagaStatus {
STARTED, // 已开始
WAITING_PAYMENT, // 等待支付(异步等待)
WAITING_FOR_APPROVAL, // 等待审核(人工介入)
PROCESSING, // 处理中
COMPLETED, // 已完成 ✅
COMPENSATING, // 补偿中
COMPENSATED, // 已补偿 ✅
COMPENSATION_FAILED, // 补偿失败 ❌
FAILED; // 失败 ❌
public boolean isTerminal() {
return this == COMPLETED ||
this == COMPENSATED ||
this == COMPENSATION_FAILED ||
this == FAILED;
}
}作用:
✅ 明确的状态转换规则
✅ 区分终态和中间态
✅ 支持异步等待状态(支付、审核)
4️⃣ SagaStep(步骤:原子操作)
java
public class SagaStep {
private String stepName; // 步骤名称
private String stepData; // 步骤数据
private StepStatus status; // 步骤状态
private LocalDateTime executedTime; // 执行时间
private String errorMessage; // 错误信息
public enum StepStatus {
COMPLETED, // 已完成
COMPENSATED, // 已补偿
FAILED // 失败
}
}作用:
✅ 细粒度追踪(每个步骤的执行情况)
✅ 幂等性基础(判断步骤是否已执行)
✅ 补偿依据(记录需要回滚的步骤)
5️⃣ SagaType(类型:业务场景)
java
public enum SagaType {
ACTIVITY_CANCELLATION("活动取消流程"),
ACTIVITY_REGISTRATION("活动报名流程"),
ACTIVITY_COMPLETION("活动完成流程"),
ACTIVITY_QUIT("用户退出活动流程"),
PARTIAL_REFUND("部分退款申请流程");
}作用:
✅ 业务分类(不同场景不同处理)
✅ 路由依据(恢复时找到对应的 Saga 实例)
✅ 统计分析(各类 Saga 的成功率、耗时等)
🎯 Saga 解决的核心问题
❶ 分布式事务一致性
传统方案:2PC(两阶段提交)、3PC
问题:
- 同步阻塞,性能差
- 单点故障风险
- 不适合微服务Saga 方案:最终一致性
// 活动取消 Saga
Step 1: 取消活动(本地事务)✅
Step 2: 创建退款记录(微信支付系统)✅
Step 3: 执行退款(微信支付系统)✅
Step 4: 发送通知(消息系统)✅
// 如果 Step 3 失败 → 自动补偿
Step 3 补偿: 删除退款记录
Step 2 补偿: 删除退款记录
Step 1 补偿: 恢复活动状态❷ 幂等性保证
问题:网络重试导致重复操作
用户点击"退款" → 请求超时 → 用户再次点击 → 重复退款!❌Saga 解决:
java
protected List<RefundRecord> executeStep2_CreateRefundRecords(ActivityAggregate activity) {
// 幂等检查:如果步骤已完成,直接返回已有结果
if (isStepCompleted("CREATE_REFUND_RECORDS")) {
log.info("步骤已完成,从支付系统查询退款记录");
return paymentPort.findRefundRecordsByIdempotencyKey(sagaId); // ✅ 幂等
}
// 标记步骤开始
markStepStarting("CREATE_REFUND_RECORDS");
sagaStateRepository.update(state); // ✅ 持久化状态
// 执行远程调用(使用 sagaId 作为幂等键)
List<RefundRecord> records = paymentPort.createRefundRecords(activity, sagaId);
// 标记步骤完成
markStepCompleted("CREATE_REFUND_RECORDS", records.size());
sagaStateRepository.update(state); // ✅ 持久化状态
return records;
}❸ 故障恢复
问题:服务重启、网络中断导致流程中断
执行到 Step 2 → 服务重启 → Saga 丢失!❌Saga 解决:
java
// 定时任务扫描未完成的 Saga
@Scheduled(fixedDelay = 60000)
public void recoverPendingSagas() {
// 1. 从数据库查询未完成的 Saga
List<SagaState> pendingSagas = sagaStateRepository.findPendingSagas();
for (SagaState state : pendingSagas) {
// 2. 根据 SagaType 找到对应的 Saga 实例
ISaga saga = getSagaInstance(state.getSagaType());
// 3. 恢复状态
saga.setState(state);
// 4. 重新执行(已完成的步骤会被跳过)
saga.execute(command); // ✅ 自动从断点继续
}
}❹ 补偿机制
问题:分布式系统无法回滚
本地数据库可以回滚 → ROLLBACK
微信支付无法回滚 → 只能执行"退款"操作Saga 解决:
java
@Override
public void compensate() {
log.warn("[Saga-{}] 开始补偿", sagaId);
// 逆序补偿已完成的步骤
List<SagaStep> steps = state.getCompletedSteps();
Collections.reverse(steps);
for (SagaStep step : steps) {
try {
switch (step.getStepName()) {
case "PROCESS_REFUNDS":
compensateStep3_ProcessRefunds(); // 撤销退款
break;
case "CREATE_REFUND_RECORDS":
compensateStep2_CreateRefundRecords(); // 删除退款记录
break;
case "CANCEL_ACTIVITY":
compensateStep1_CancelActivity(); // 恢复活动状态
break;
}
step.markCompensated();
} catch (Exception e) {
log.error("补偿失败: {}", step.getStepName(), e);
}
}
}❺ 异步等待
问题:支付需要用户扫码,不能同步等待
创建订单 → 等待支付(可能几分钟)→ 继续后续步骤Saga 解决:
java
@Override
public SagaResult execute(ActivityRegistrationCommand command) {
// Step 1: 预留名额
executeStep1_ReserveSlot(command);
// Step 2: 创建订单
String orderId = executeStep2_CreateOrder(command);
// Step 3: 获取支付链接
String payUrl = executeStep3_GetPaymentUrl(orderId);
// ✅ 标记为等待支付状态
state.setStatus(SagaStatus.WAITING_PAYMENT);
state.addMetadata("orderId", orderId);
state.addMetadata("payUrl", payUrl);
sagaStateRepository.update(state);
// 返回支付链接给用户
return SagaResult.waiting(sagaId, payUrl);
}
// 支付回调触发继续执行
public void resumeAfterPayment(String sagaId) {
SagaState state = sagaStateRepository.findById(sagaId);
// Step 4: 添加参与者
executeStep4_AddParticipant(command);
// Step 5: 发送通知
executeStep5_SendNotification();
}📊 Saga 的完整生命周期
┌─────────────────────────────────────────────────────────┐
│ 1. 用户操作:取消活动 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 2. 创建 SagaCommand(业务参数 + 操作元数据) │
│ - activityId: "ACT123" │
│ - operatorId: "USER456" │
│ - createTime: 2025-12-13 10:00:00 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 3. 初始化 SagaState(执行状态) │
│ - sagaId: "saga-uuid-123" │
│ - sagaType: "ACTIVITY_CANCELLATION" │
│ - status: STARTED │
│ - completedSteps: [] │
│ ↓ 保存到数据库 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 4. 执行 Step 1: 取消活动 │
│ ✅ 执行成功 │
│ ↓ 更新 SagaState │
│ - completedSteps: ["CANCEL_ACTIVITY"] │
│ - currentStep: "CANCEL_ACTIVITY" │
│ ↓ 持久化到数据库 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 5. 执行 Step 2: 创建退款记录 │
│ ✅ 幂等检查:未完成 → 执行 │
│ ↓ 调用微信支付 API │
│ ✅ 执行成功 │
│ ↓ 更新 SagaState │
│ - completedSteps: ["CANCEL_ACTIVITY", "CREATE_REFUND"]│
│ ↓ 持久化到数据库 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 6. 【网络中断】服务重启 │
│ ⚠️ Saga 内存状态丢失 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 7. 定时任务扫描(1 分钟后) │
│ ↓ 查询数据库 │
│ - 发现未完成的 Saga: "saga-uuid-123" │
│ - completedSteps: ["CANCEL_ACTIVITY", "CREATE_REFUND"]│
│ - currentStep: "CREATE_REFUND" │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 8. 恢复 Saga 执行 │
│ ↓ 重新执行 execute() │
│ - Step 1: 已完成 ✅ 跳过 │
│ - Step 2: 已完成 ✅ 跳过 │
│ - Step 3: 未完成 → 执行 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 9. 执行 Step 3: 执行退款 │
│ ✅ 幂等检查:未完成 → 执行 │
│ ↓ 调用微信支付 API │
│ ✅ 执行成功 │
│ ↓ 更新 SagaState │
│ - completedSteps: [..., "PROCESS_REFUNDS"] │
│ - status: COMPLETED │
│ - completedTime: 2025-12-13 10:02:30 │
│ ↓ 持久化到数据库 │
└────────────────┬────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 10. Saga 完成 ✅ │
│ - 用户收到退款 │
│ - 数据库保留完整执行记录(审计) │
└─────────────────────────────────────────────────────────┘🚀 Saga 的强大之处
1. 可观测性(Observability)
sql
-- 查询所有失败的 Saga
SELECT * FROM saga_state WHERE status = 'FAILED';
-- 查询执行超过 30 分钟的 Saga
SELECT * FROM saga_state
WHERE status NOT IN ('COMPLETED', 'FAILED')
AND created_time < NOW() - INTERVAL 30 MINUTE;
-- 统计各类 Saga 的成功率
SELECT saga_type,
COUNT(*) as total,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) as success
FROM saga_state
GROUP BY saga_type;2. 可调试性(Debuggability)
sql
// 每个 Saga 的完整执行日志
SagaState state = sagaStateRepository.findById("saga-123");
System.out.println("Saga 类型: " + state.getSagaType());
System.out.println("当前步骤: " + state.getCurrentStep());
System.out.println("已完成步骤: ");
for (SagaStep step : state.getCompletedSteps()) {
System.out.println(" - " + step.getStepName() + " at " + step.getExecutedTime());
}3. 可扩展性(Extensibility)
java
// 轻松添加新的 Saga
public class OrderRefundSaga extends AbstractSaga<OrderRefundCommand, SagaResult> {
@Override
public SagaResult execute(OrderRefundCommand command) {
// 复用 Saga 框架的所有能力
initializeSagaState(command.getOrderId());
// ... 业务逻辑
}
}4. 可维护性(Maintainability)
java
// 清晰的业务流程
public SagaResult execute(ActivityCancellationCommand command) {
executeStep1_CancelActivity(command); // 一眼就知道这是取消活动
executeStep2_CreateRefundRecords(activity); // 创建退款记录
executeStep3_ProcessRefunds(refundRecords); // 执行退款
executeStep4_NotifyParticipants(activity); // 发送通知
}✅ 总结:Saga 是企业级分布式系统的标配
| 维度 | Saga 的优势 |
|---|---|
| 一致性 | 最终一致性,不阻塞 |
| 可靠性 | 故障自动恢复,无数据丢失 |
| 幂等性 | 天然支持重试,避免重复操作 |
| 补偿 | 逆向补偿,保证数据一致 |
| 可观测 | 完整日志,便于排查问题 |
| 扩展性 | 统一框架,快速开发新流程 |
| 审计 | 完整记录,满足合规要求 |
你的 Saga 实现已经达到了生产级别的标准!🏆 这套架构可以应对:
✅ 微信支付集成
✅ 多系统协作
✅ 高并发场景
✅ 故障恢复
✅ 审计合规
继续完善下去,这将是一个教科书级别的 DDD + Saga 实现!👏