Skip to content

🎯 活动领域(Activity Domain)的 Saga 场景识别

核心 Saga 场景矩阵

场景复杂度跨聚合补偿需求推荐优先级
1. 取消活动(含退款)⭐⭐⭐⭐⭐3+P0 - 必须
2. 报名付费活动⭐⭐⭐⭐2-3P0 - 必须
3. 活动成员限制与审核⭐⭐⭐2⚠️P1 - 重要
4. 活动完成与积分发放⭐⭐⭐⭐3⚠️P1 - 重要
5. 活动超时自动取消⭐⭐⭐2P2 - 可选

📋 详细场景分析

Saga 1: 取消活动(含通知与退款) ⭐⭐⭐⭐⭐

业务需求

创建者取消活动后,需要:

  1. 更新活动状态为"已取消"

  2. 通知所有参与者

  3. 退款(如果活动收费)

  4. 扣除活动发布者信用分(如频繁取消)

  5. 更新用户统计数据

涉及聚合根

  • ActivityAggregate - 活动聚合

  • PaymentAggregate - 支付聚合(需新建)

  • UserAggregate - 用户聚合

  • NotificationAggregate - 通知聚合(可选)

Saga 步骤与补偿

java
/**
 * 活动取消 Saga
 * 
 * 正向流程:
 * 1. 锁定活动状态(乐观锁)
 * 2. 创建退款记录
 * 3. 执行批量退款
 * 4. 发送通知给参与者
 * 5. 更新用户信用分
 * 6. 更新统计数据
 * 
 * 补偿流程(任一步失败):
 * - Step 1 失败 → 无需补偿
 * - Step 2 失败 → 恢复活动状态
 * - Step 3 失败 → 删除退款记录 + 恢复活动状态
 * - Step 4 失败 → 记录通知失败,定时重试(非关键)
 * - Step 5 失败 → 记录补偿任务,异步重试
 * - Step 6 失败 → 记录补偿任务,异步重试
 */

代码实现建议

java
// ============================================
// Domain 层 - Saga 定义
// ============================================

package com.alisunxin.api.domain.activity.saga;

/**
 * 活动取消 Saga
 * 
 * 负责编排跨聚合的活动取消业务流程
 * 包含事务补偿机制
 */
@Slf4j
@Component
public class ActivityCancellationSaga {
    
    private final IActivityAggregateRepository activityRepository;
    private final IPaymentPort paymentPort;  // Output Port - 支付端口
    private final IUserCreditPort userCreditPort;  // Output Port - 用户信用端口
    private final IActivityNotificationPort notificationPort;
    private final ISagaStateRepository sagaStateRepository;  // Saga 状态持久化
    
    /**
     * Saga ID(用于追踪和幂等)
     */
    private final String sagaId;
    
    /**
     * Saga 状态
     */
    private SagaState state;
    
    /**
     * 执行 Saga
     */
    public SagaResult execute(ActivityCancellationCommand command) {
        sagaId = UUID.randomUUID().toString();
        state = SagaState.builder()
            .sagaId(sagaId)
            .sagaType("ACTIVITY_CANCELLATION")
            .aggregateId(command.getActivityId())
            .status(SagaStatus.STARTED)
            .build();
        
        sagaStateRepository.save(state);
        
        try {
            // Step 1: 取消活动(本地事务)
            ActivityAggregate activity = executeStep1_CancelActivity(command);
            
            // Step 2: 创建退款记录(远程调用)
            List<RefundRecord> refundRecords = executeStep2_CreateRefundRecords(activity);
            
            // Step 3: 执行批量退款(远程调用 + 幂等)
            RefundResult refundResult = executeStep3_ProcessRefunds(refundRecords);
            
            // Step 4: 发送通知(异步,非阻塞)
            executeStep4_NotifyParticipants(activity);
            
            // Step 5: 更新用户信用分(远程调用,允许失败)
            executeStep5_UpdateCreatorCredit(activity, command.getReason());
            
            // Step 6: 更新统计数据(异步,最终一致)
            executeStep6_UpdateStatistics(activity);
            
            // 标记 Saga 完成
            state.setStatus(SagaStatus.COMPLETED);
            sagaStateRepository.update(state);
            
            return SagaResult.success(sagaId);
            
        } catch (Exception e) {
            log.error("Saga 执行失败: {}, 开始补偿", sagaId, e);
            compensate();
            return SagaResult.failure(sagaId, e.getMessage());
        }
    }
    
    /**
     * Step 1: 取消活动
     */
    @Transactional
    private ActivityAggregate executeStep1_CancelActivity(ActivityCancellationCommand command) {
        log.info("[Saga-{}] Step 1: 取消活动 - {}", sagaId, command.getActivityId());
        
        ActivityAggregate activity = activityRepository
            .findById(ActivityId.of(command.getActivityId()))
            .orElseThrow(() -> new IllegalArgumentException("活动不存在"));
        
        // 领域逻辑:取消活动
        activity.cancelActivity(command.getReason());
        
        // 持久化
        activityRepository.update(activity);
        
        // 记录 Saga 步骤
        state.addCompletedStep(new SagaStep("CANCEL_ACTIVITY", activity.getActivityId().getValue()));
        sagaStateRepository.update(state);
        
        return activity;
    }
    
    /**
     * Step 2: 创建退款记录
     */
    private List<RefundRecord> executeStep2_CreateRefundRecords(ActivityAggregate activity) {
        log.info("[Saga-{}] Step 2: 创建退款记录", sagaId);
        
        if (activity.getParticipantIds().isEmpty()) {
            log.info("[Saga-{}] 无参与者,跳过退款", sagaId);
            return List.of();
        }
        
        // 调用支付端口(幂等)
        List<RefundRecord> refundRecords = paymentPort.createRefundRecords(
            activity.getActivityId().getValue(),
            activity.getParticipantIds().stream()
                .map(UserId::getValue)
                .collect(Collectors.toList()),
            RefundReason.ACTIVITY_CANCELLED,
            sagaId  // 传入 Saga ID 保证幂等
        );
        
        state.addCompletedStep(new SagaStep("CREATE_REFUND_RECORDS", refundRecords.size()));
        sagaStateRepository.update(state);
        
        return refundRecords;
    }
    
    /**
     * Step 3: 执行批量退款(关键步骤)
     */
    private RefundResult executeStep3_ProcessRefunds(List<RefundRecord> refundRecords) {
        log.info("[Saga-{}] Step 3: 执行退款 - {} 笔", sagaId, refundRecords.size());
        
        if (refundRecords.isEmpty()) {
            return RefundResult.empty();
        }
        
        try {
            // 调用支付网关(幂等 + 重试)
            RefundResult result = paymentPort.batchRefund(refundRecords, sagaId);
            
            if (result.hasFailures()) {
                log.warn("[Saga-{}] 部分退款失败: {}", sagaId, result.getFailedCount());
                // 记录失败的退款,稍后重试
                paymentPort.scheduleRetry(result.getFailedRefunds());
            }
            
            state.addCompletedStep(new SagaStep("PROCESS_REFUNDS", result.getSuccessCount()));
            sagaStateRepository.update(state);
            
            return result;
            
        } catch (PaymentGatewayException e) {
            log.error("[Saga-{}] 退款失败,触发补偿", sagaId, e);
            throw new SagaException("退款失败", e);
        }
    }
    
    /**
     * Step 4: 发送通知(非关键步骤,允许异步重试)
     */
    @Async
    private void executeStep4_NotifyParticipants(ActivityAggregate activity) {
        log.info("[Saga-{}] Step 4: 发送通知", sagaId);
        
        try {
            // 批量通知(异步)
            notificationPort.notifyActivityCancelled(
                activity.getActivityId().getValue(),
                activity.getParticipantIds().stream()
                    .map(UserId::getValue)
                    .collect(Collectors.toList()),
                activity.getTitle(),
                activity.getCancellationReason()
            );
            
            state.addCompletedStep(new SagaStep("NOTIFY_PARTICIPANTS", activity.getParticipantIds().size()));
            
        } catch (Exception e) {
            log.error("[Saga-{}] 通知发送失败,将异步重试", sagaId, e);
            // 不阻塞主流程,记录失败日志即可
            notificationPort.scheduleRetry(activity.getActivityId().getValue());
        }
    }
    
    /**
     * Step 5: 更新创建者信用分(非关键步骤)
     */
    private void executeStep5_UpdateCreatorCredit(ActivityAggregate activity, String reason) {
        log.info("[Saga-{}] Step 5: 更新信用分", sagaId);
        
        try {
            // 频繁取消活动扣信用分
            userCreditPort.deductCredit(
                activity.getCreatorId().getValue(),
                CreditDeductionReason.FREQUENT_CANCELLATION,
                10,  // 扣除10分
                sagaId
            );
            
            state.addCompletedStep(new SagaStep("UPDATE_CREATOR_CREDIT", 10));
            
        } catch (Exception e) {
            log.warn("[Saga-{}] 信用分更新失败,记录补偿任务", sagaId, e);
            // 记录到补偿队列,异步重试
            userCreditPort.scheduleCompensation(activity.getCreatorId().getValue(), sagaId);
        }
    }
    
    /**
     * Step 6: 更新统计数据(最终一致性)
     */
    @Async
    private void executeStep6_UpdateStatistics(ActivityAggregate activity) {
        log.info("[Saga-{}] Step 6: 更新统计数据", sagaId);
        
        // 发送统计事件,由统计服务异步处理
        domainEventPublisher.publish(new ActivityCancelledStatisticsEvent(
            activity.getActivityId(),
            activity.getCreatorId(),
            activity.getCurrentParticipants(),
            LocalDateTime.now()
        ));
    }
    
    /**
     * 补偿流程
     */
    private void compensate() {
        log.warn("[Saga-{}] 开始执行补偿流程", sagaId);
        
        state.setStatus(SagaStatus.COMPENSATING);
        sagaStateRepository.update(state);
        
        // 按逆序执行补偿
        List<SagaStep> completedSteps = state.getCompletedSteps();
        Collections.reverse(completedSteps);
        
        for (SagaStep step : completedSteps) {
            try {
                switch (step.getStepName()) {
                    case "CANCEL_ACTIVITY":
                        compensateStep1_RestoreActivity(step);
                        break;
                    case "CREATE_REFUND_RECORDS":
                        compensateStep2_DeleteRefundRecords(step);
                        break;
                    case "PROCESS_REFUNDS":
                        compensateStep3_ReverseRefunds(step);
                        break;
                    // 通知和信用分无需补偿(允许最终一致)
                }
            } catch (Exception e) {
                log.error("[Saga-{}] 补偿步骤失败: {}", sagaId, step.getStepName(), e);
                // 记录补偿失败,人工介入
                state.setStatus(SagaStatus.COMPENSATION_FAILED);
                sagaStateRepository.update(state);
                alertService.sendCriticalAlert("Saga补偿失败", sagaId, e);
                return;
            }
        }
        
        state.setStatus(SagaStatus.COMPENSATED);
        sagaStateRepository.update(state);
        log.info("[Saga-{}] 补偿流程完成", sagaId);
    }
    
    /**
     * 补偿: 恢复活动状态
     */
    @Transactional
    private void compensateStep1_RestoreActivity(SagaStep step) {
        log.info("[Saga-{}] 补偿: 恢复活动状态", sagaId);
        
        ActivityAggregate activity = activityRepository
            .findById(ActivityId.of((String) step.getStepData()))
            .orElseThrow();
        
        activity.restoreFromCancellation();  // 新增领域方法
        activityRepository.update(activity);
    }
    
    /**
     * 补偿: 删除退款记录
     */
    private void compensateStep2_DeleteRefundRecords(SagaStep step) {
        log.info("[Saga-{}] 补偿: 删除退款记录", sagaId);
        
        paymentPort.cancelRefundRecords(sagaId);
    }
    
    /**
     * 补偿: 撤销退款(重新扣款)
     */
    private void compensateStep3_ReverseRefunds(SagaStep step) {
        log.info("[Saga-{}] 补偿: 撤销退款", sagaId);
        
        // 这是最复杂的补偿:需要重新扣款
        // 实际业务可能不允许,需要记录为"待人工处理"
        paymentPort.reverseRefunds(sagaId);
    }
}

// ============================================
// Domain 层 - Saga 状态持久化
// ============================================

/**
 * Saga 状态聚合
 */
@Data
@Builder
public class SagaState {
    private String sagaId;
    private String sagaType;
    private String aggregateId;
    private SagaStatus status;
    private List<SagaStep> completedSteps;
    private LocalDateTime createdTime;
    private LocalDateTime updatedTime;
    
    public void addCompletedStep(SagaStep step) {
        if (completedSteps == null) {
            completedSteps = new ArrayList<>();
        }
        completedSteps.add(step);
        this.updatedTime = LocalDateTime.now();
    }
}

/**
 * Saga 状态枚举
 */
public enum SagaStatus {
    STARTED,           // 已开始
    COMPLETED,         // 已完成
    COMPENSATING,      // 补偿中
    COMPENSATED,       // 已补偿
    COMPENSATION_FAILED, // 补偿失败(需人工介入)
    FAILED             // 失败
}

/**
 * Saga 步骤
 */
@Data
@AllArgsConstructor
public class SagaStep {
    private String stepName;
    private Object stepData;
}

// ============================================
// Domain 层 - Output Port 定义
// ============================================

/**
 * 支付端口(六边形架构 - Output Port)
 */
public interface IPaymentPort {
    
    /**
     * 创建退款记录(幂等)
     */
    List<RefundRecord> createRefundRecords(
        String activityId, 
        List<String> userIds,
        RefundReason reason,
        String idempotencyKey
    );
    
    /**
     * 批量退款(幂等 + 部分失败容忍)
     */
    RefundResult batchRefund(List<RefundRecord> records, String idempotencyKey);
    
    /**
     * 撤销退款记录
     */
    void cancelRefundRecords(String idempotencyKey);
    
    /**
     * 撤销退款(重新扣款)
     */
    void reverseRefunds(String idempotencyKey);
    
    /**
     * 调度重试
     */
    void scheduleRetry(List<RefundRecord> failedRefunds);
}

/**
 * 用户信用端口
 */
public interface IUserCreditPort {
    
    /**
     * 扣除信用分
     */
    void deductCredit(String userId, CreditDeductionReason reason, int points, String idempotencyKey);
    
    /**
     * 调度补偿任务
     */
    void scheduleCompensation(String userId, String sagaId);
}

// ============================================
// ApplicationService 调用 Saga
// ============================================

@Service
public class ActivityApplicationService {
    
    @Resource
    private ActivityCancellationSaga cancellationSaga;
    
    /**
     * 取消活动(通过 Saga 编排)
     */
    public void cancelActivity(String activityId, String userId, String reason) {
        log.info("取消活动 - 活动ID: {}, 用户ID: {}, 原因: {}", activityId, userId, reason);
        
        // 构建 Saga 命令
        ActivityCancellationCommand command = ActivityCancellationCommand.builder()
            .activityId(activityId)
            .userId(userId)
            .reason(reason)
            .build();
        
        // 执行 Saga
        SagaResult result = cancellationSaga.execute(command);
        
        if (result.isSuccess()) {
            log.info("活动取消成功 - Saga ID: {}", result.getSagaId());
        } else {
            log.error("活动取消失败 - Saga ID: {}, 错误: {}", result.getSagaId(), result.getErrorMessage());
            throw new AppException("活动取消失败: " + result.getErrorMessage());
        }
    }
}

其他 Saga 场景(精简版)

Saga 2: 报名付费活动

正向流程:

  1. 锁定活动名额(乐观锁)
  2. 创建订单
  3. 调用支付网关
  4. 确认支付成功
  5. 添加用户到参与者列表
  6. 发送报名成功通知

补偿流程:

- 支付失败 → 释放名额

- 添加参与者失败 → 退款 + 释放名额

Saga 3: 活动完成与积分奖励

正向流程:

  1. 标记活动为"已完成"
  2. 计算参与者积分奖励
  3. 批量发放积分
  4. 更新用户等级
  5. 生成活动报告
  6. 发送完成通知

补偿流程:

- 积分发放部分失败 → 记录补偿任务,异步重试

Saga 4: 活动成员审核通过

正向流程:

  1. 更新成员状态为"已通过"
  2. 检查是否需要扣费
  3. 扣费成功后添加到参与者列表
  4. 发送审核通过通知
  5. 通知活动创建者

补偿流程:

- 扣费失败 → 恢复审核状态

- 添加失败 → 退款 + 恢复审核状态


🎖 最佳实践建议

  1. 优先实现 Saga 1 和 Saga 2(涉及金钱,必须可靠)
  2. Saga 状态持久化到数据库,支持故障恢复
  3. 每个步骤支持幂等(传入 Saga ID 作为幂等键)
  4. 关键步骤同步,非关键步骤异步(通知、统计)
  5. 补偿失败人工介入(发送告警,记录到管理后台)

Powered by VitePress