🎯 活动领域(Activity Domain)的 Saga 场景识别
核心 Saga 场景矩阵
| 场景 | 复杂度 | 跨聚合 | 补偿需求 | 推荐优先级 |
|---|---|---|---|---|
| 1. 取消活动(含退款) | ⭐⭐⭐⭐⭐ | 3+ | ✅ | P0 - 必须 |
| 2. 报名付费活动 | ⭐⭐⭐⭐ | 2-3 | ✅ | P0 - 必须 |
| 3. 活动成员限制与审核 | ⭐⭐⭐ | 2 | ⚠️ | P1 - 重要 |
| 4. 活动完成与积分发放 | ⭐⭐⭐⭐ | 3 | ⚠️ | P1 - 重要 |
| 5. 活动超时自动取消 | ⭐⭐⭐ | 2 | ✅ | P2 - 可选 |
📋 详细场景分析
Saga 1: 取消活动(含通知与退款) ⭐⭐⭐⭐⭐
业务需求
创建者取消活动后,需要:
更新活动状态为"已取消"
通知所有参与者
退款(如果活动收费)
扣除活动发布者信用分(如频繁取消)
更新用户统计数据
涉及聚合根
ActivityAggregate - 活动聚合
PaymentAggregate - 支付聚合(需新建)
UserAggregate - 用户聚合
NotificationAggregate - 通知聚合(可选)
Saga 步骤与补偿
java
/**
* 活动取消 Saga
*
* 正向流程:
* 1. 锁定活动状态(乐观锁)
* 2. 创建退款记录
* 3. 执行批量退款
* 4. 发送通知给参与者
* 5. 更新用户信用分
* 6. 更新统计数据
*
* 补偿流程(任一步失败):
* - Step 1 失败 → 无需补偿
* - Step 2 失败 → 恢复活动状态
* - Step 3 失败 → 删除退款记录 + 恢复活动状态
* - Step 4 失败 → 记录通知失败,定时重试(非关键)
* - Step 5 失败 → 记录补偿任务,异步重试
* - Step 6 失败 → 记录补偿任务,异步重试
*/代码实现建议
java
// ============================================
// Domain 层 - Saga 定义
// ============================================
package com.alisunxin.api.domain.activity.saga;
/**
* 活动取消 Saga
*
* 负责编排跨聚合的活动取消业务流程
* 包含事务补偿机制
*/
@Slf4j
@Component
public class ActivityCancellationSaga {
private final IActivityAggregateRepository activityRepository;
private final IPaymentPort paymentPort; // Output Port - 支付端口
private final IUserCreditPort userCreditPort; // Output Port - 用户信用端口
private final IActivityNotificationPort notificationPort;
private final ISagaStateRepository sagaStateRepository; // Saga 状态持久化
/**
* Saga ID(用于追踪和幂等)
*/
private final String sagaId;
/**
* Saga 状态
*/
private SagaState state;
/**
* 执行 Saga
*/
public SagaResult execute(ActivityCancellationCommand command) {
sagaId = UUID.randomUUID().toString();
state = SagaState.builder()
.sagaId(sagaId)
.sagaType("ACTIVITY_CANCELLATION")
.aggregateId(command.getActivityId())
.status(SagaStatus.STARTED)
.build();
sagaStateRepository.save(state);
try {
// Step 1: 取消活动(本地事务)
ActivityAggregate activity = executeStep1_CancelActivity(command);
// Step 2: 创建退款记录(远程调用)
List<RefundRecord> refundRecords = executeStep2_CreateRefundRecords(activity);
// Step 3: 执行批量退款(远程调用 + 幂等)
RefundResult refundResult = executeStep3_ProcessRefunds(refundRecords);
// Step 4: 发送通知(异步,非阻塞)
executeStep4_NotifyParticipants(activity);
// Step 5: 更新用户信用分(远程调用,允许失败)
executeStep5_UpdateCreatorCredit(activity, command.getReason());
// Step 6: 更新统计数据(异步,最终一致)
executeStep6_UpdateStatistics(activity);
// 标记 Saga 完成
state.setStatus(SagaStatus.COMPLETED);
sagaStateRepository.update(state);
return SagaResult.success(sagaId);
} catch (Exception e) {
log.error("Saga 执行失败: {}, 开始补偿", sagaId, e);
compensate();
return SagaResult.failure(sagaId, e.getMessage());
}
}
/**
* Step 1: 取消活动
*/
@Transactional
private ActivityAggregate executeStep1_CancelActivity(ActivityCancellationCommand command) {
log.info("[Saga-{}] Step 1: 取消活动 - {}", sagaId, command.getActivityId());
ActivityAggregate activity = activityRepository
.findById(ActivityId.of(command.getActivityId()))
.orElseThrow(() -> new IllegalArgumentException("活动不存在"));
// 领域逻辑:取消活动
activity.cancelActivity(command.getReason());
// 持久化
activityRepository.update(activity);
// 记录 Saga 步骤
state.addCompletedStep(new SagaStep("CANCEL_ACTIVITY", activity.getActivityId().getValue()));
sagaStateRepository.update(state);
return activity;
}
/**
* Step 2: 创建退款记录
*/
private List<RefundRecord> executeStep2_CreateRefundRecords(ActivityAggregate activity) {
log.info("[Saga-{}] Step 2: 创建退款记录", sagaId);
if (activity.getParticipantIds().isEmpty()) {
log.info("[Saga-{}] 无参与者,跳过退款", sagaId);
return List.of();
}
// 调用支付端口(幂等)
List<RefundRecord> refundRecords = paymentPort.createRefundRecords(
activity.getActivityId().getValue(),
activity.getParticipantIds().stream()
.map(UserId::getValue)
.collect(Collectors.toList()),
RefundReason.ACTIVITY_CANCELLED,
sagaId // 传入 Saga ID 保证幂等
);
state.addCompletedStep(new SagaStep("CREATE_REFUND_RECORDS", refundRecords.size()));
sagaStateRepository.update(state);
return refundRecords;
}
/**
* Step 3: 执行批量退款(关键步骤)
*/
private RefundResult executeStep3_ProcessRefunds(List<RefundRecord> refundRecords) {
log.info("[Saga-{}] Step 3: 执行退款 - {} 笔", sagaId, refundRecords.size());
if (refundRecords.isEmpty()) {
return RefundResult.empty();
}
try {
// 调用支付网关(幂等 + 重试)
RefundResult result = paymentPort.batchRefund(refundRecords, sagaId);
if (result.hasFailures()) {
log.warn("[Saga-{}] 部分退款失败: {}", sagaId, result.getFailedCount());
// 记录失败的退款,稍后重试
paymentPort.scheduleRetry(result.getFailedRefunds());
}
state.addCompletedStep(new SagaStep("PROCESS_REFUNDS", result.getSuccessCount()));
sagaStateRepository.update(state);
return result;
} catch (PaymentGatewayException e) {
log.error("[Saga-{}] 退款失败,触发补偿", sagaId, e);
throw new SagaException("退款失败", e);
}
}
/**
* Step 4: 发送通知(非关键步骤,允许异步重试)
*/
@Async
private void executeStep4_NotifyParticipants(ActivityAggregate activity) {
log.info("[Saga-{}] Step 4: 发送通知", sagaId);
try {
// 批量通知(异步)
notificationPort.notifyActivityCancelled(
activity.getActivityId().getValue(),
activity.getParticipantIds().stream()
.map(UserId::getValue)
.collect(Collectors.toList()),
activity.getTitle(),
activity.getCancellationReason()
);
state.addCompletedStep(new SagaStep("NOTIFY_PARTICIPANTS", activity.getParticipantIds().size()));
} catch (Exception e) {
log.error("[Saga-{}] 通知发送失败,将异步重试", sagaId, e);
// 不阻塞主流程,记录失败日志即可
notificationPort.scheduleRetry(activity.getActivityId().getValue());
}
}
/**
* Step 5: 更新创建者信用分(非关键步骤)
*/
private void executeStep5_UpdateCreatorCredit(ActivityAggregate activity, String reason) {
log.info("[Saga-{}] Step 5: 更新信用分", sagaId);
try {
// 频繁取消活动扣信用分
userCreditPort.deductCredit(
activity.getCreatorId().getValue(),
CreditDeductionReason.FREQUENT_CANCELLATION,
10, // 扣除10分
sagaId
);
state.addCompletedStep(new SagaStep("UPDATE_CREATOR_CREDIT", 10));
} catch (Exception e) {
log.warn("[Saga-{}] 信用分更新失败,记录补偿任务", sagaId, e);
// 记录到补偿队列,异步重试
userCreditPort.scheduleCompensation(activity.getCreatorId().getValue(), sagaId);
}
}
/**
* Step 6: 更新统计数据(最终一致性)
*/
@Async
private void executeStep6_UpdateStatistics(ActivityAggregate activity) {
log.info("[Saga-{}] Step 6: 更新统计数据", sagaId);
// 发送统计事件,由统计服务异步处理
domainEventPublisher.publish(new ActivityCancelledStatisticsEvent(
activity.getActivityId(),
activity.getCreatorId(),
activity.getCurrentParticipants(),
LocalDateTime.now()
));
}
/**
* 补偿流程
*/
private void compensate() {
log.warn("[Saga-{}] 开始执行补偿流程", sagaId);
state.setStatus(SagaStatus.COMPENSATING);
sagaStateRepository.update(state);
// 按逆序执行补偿
List<SagaStep> completedSteps = state.getCompletedSteps();
Collections.reverse(completedSteps);
for (SagaStep step : completedSteps) {
try {
switch (step.getStepName()) {
case "CANCEL_ACTIVITY":
compensateStep1_RestoreActivity(step);
break;
case "CREATE_REFUND_RECORDS":
compensateStep2_DeleteRefundRecords(step);
break;
case "PROCESS_REFUNDS":
compensateStep3_ReverseRefunds(step);
break;
// 通知和信用分无需补偿(允许最终一致)
}
} catch (Exception e) {
log.error("[Saga-{}] 补偿步骤失败: {}", sagaId, step.getStepName(), e);
// 记录补偿失败,人工介入
state.setStatus(SagaStatus.COMPENSATION_FAILED);
sagaStateRepository.update(state);
alertService.sendCriticalAlert("Saga补偿失败", sagaId, e);
return;
}
}
state.setStatus(SagaStatus.COMPENSATED);
sagaStateRepository.update(state);
log.info("[Saga-{}] 补偿流程完成", sagaId);
}
/**
* 补偿: 恢复活动状态
*/
@Transactional
private void compensateStep1_RestoreActivity(SagaStep step) {
log.info("[Saga-{}] 补偿: 恢复活动状态", sagaId);
ActivityAggregate activity = activityRepository
.findById(ActivityId.of((String) step.getStepData()))
.orElseThrow();
activity.restoreFromCancellation(); // 新增领域方法
activityRepository.update(activity);
}
/**
* 补偿: 删除退款记录
*/
private void compensateStep2_DeleteRefundRecords(SagaStep step) {
log.info("[Saga-{}] 补偿: 删除退款记录", sagaId);
paymentPort.cancelRefundRecords(sagaId);
}
/**
* 补偿: 撤销退款(重新扣款)
*/
private void compensateStep3_ReverseRefunds(SagaStep step) {
log.info("[Saga-{}] 补偿: 撤销退款", sagaId);
// 这是最复杂的补偿:需要重新扣款
// 实际业务可能不允许,需要记录为"待人工处理"
paymentPort.reverseRefunds(sagaId);
}
}
// ============================================
// Domain 层 - Saga 状态持久化
// ============================================
/**
* Saga 状态聚合
*/
@Data
@Builder
public class SagaState {
private String sagaId;
private String sagaType;
private String aggregateId;
private SagaStatus status;
private List<SagaStep> completedSteps;
private LocalDateTime createdTime;
private LocalDateTime updatedTime;
public void addCompletedStep(SagaStep step) {
if (completedSteps == null) {
completedSteps = new ArrayList<>();
}
completedSteps.add(step);
this.updatedTime = LocalDateTime.now();
}
}
/**
* Saga 状态枚举
*/
public enum SagaStatus {
STARTED, // 已开始
COMPLETED, // 已完成
COMPENSATING, // 补偿中
COMPENSATED, // 已补偿
COMPENSATION_FAILED, // 补偿失败(需人工介入)
FAILED // 失败
}
/**
* Saga 步骤
*/
@Data
@AllArgsConstructor
public class SagaStep {
private String stepName;
private Object stepData;
}
// ============================================
// Domain 层 - Output Port 定义
// ============================================
/**
* 支付端口(六边形架构 - Output Port)
*/
public interface IPaymentPort {
/**
* 创建退款记录(幂等)
*/
List<RefundRecord> createRefundRecords(
String activityId,
List<String> userIds,
RefundReason reason,
String idempotencyKey
);
/**
* 批量退款(幂等 + 部分失败容忍)
*/
RefundResult batchRefund(List<RefundRecord> records, String idempotencyKey);
/**
* 撤销退款记录
*/
void cancelRefundRecords(String idempotencyKey);
/**
* 撤销退款(重新扣款)
*/
void reverseRefunds(String idempotencyKey);
/**
* 调度重试
*/
void scheduleRetry(List<RefundRecord> failedRefunds);
}
/**
* 用户信用端口
*/
public interface IUserCreditPort {
/**
* 扣除信用分
*/
void deductCredit(String userId, CreditDeductionReason reason, int points, String idempotencyKey);
/**
* 调度补偿任务
*/
void scheduleCompensation(String userId, String sagaId);
}
// ============================================
// ApplicationService 调用 Saga
// ============================================
@Service
public class ActivityApplicationService {
@Resource
private ActivityCancellationSaga cancellationSaga;
/**
* 取消活动(通过 Saga 编排)
*/
public void cancelActivity(String activityId, String userId, String reason) {
log.info("取消活动 - 活动ID: {}, 用户ID: {}, 原因: {}", activityId, userId, reason);
// 构建 Saga 命令
ActivityCancellationCommand command = ActivityCancellationCommand.builder()
.activityId(activityId)
.userId(userId)
.reason(reason)
.build();
// 执行 Saga
SagaResult result = cancellationSaga.execute(command);
if (result.isSuccess()) {
log.info("活动取消成功 - Saga ID: {}", result.getSagaId());
} else {
log.error("活动取消失败 - Saga ID: {}, 错误: {}", result.getSagaId(), result.getErrorMessage());
throw new AppException("活动取消失败: " + result.getErrorMessage());
}
}
}其他 Saga 场景(精简版)
Saga 2: 报名付费活动
正向流程:
- 锁定活动名额(乐观锁)
- 创建订单
- 调用支付网关
- 确认支付成功
- 添加用户到参与者列表
- 发送报名成功通知
补偿流程:
- 支付失败 → 释放名额
- 添加参与者失败 → 退款 + 释放名额
Saga 3: 活动完成与积分奖励
正向流程:
- 标记活动为"已完成"
- 计算参与者积分奖励
- 批量发放积分
- 更新用户等级
- 生成活动报告
- 发送完成通知
补偿流程:
- 积分发放部分失败 → 记录补偿任务,异步重试
Saga 4: 活动成员审核通过
正向流程:
- 更新成员状态为"已通过"
- 检查是否需要扣费
- 扣费成功后添加到参与者列表
- 发送审核通过通知
- 通知活动创建者
补偿流程:
- 扣费失败 → 恢复审核状态
- 添加失败 → 退款 + 恢复审核状态
🎖 最佳实践建议
- 优先实现 Saga 1 和 Saga 2(涉及金钱,必须可靠)
- Saga 状态持久化到数据库,支持故障恢复
- 每个步骤支持幂等(传入 Saga ID 作为幂等键)
- 关键步骤同步,非关键步骤异步(通知、统计)
- 补偿失败人工介入(发送告警,记录到管理后台)