PSI进销存系统分布式事务与数据一致性
分布式事务概述
PSI 进销存系统涉及采购、销售、库存、财务等多个业务模块,业务操作需要保证数据的一致性。随着系统架构向微服务演进,分布式事务成为必须面对的问题。本文介绍 PSI 系统中分布式事务的处理方案,包括 Seata 框架、消息队列最终一致性等实现。
CAP 理论与事务选型
| 特性 | CP (强一致) | AP (可用性) |
|---|---|---|
| 一致性 | 强一致 | 最终一致 |
| 可用性 | 牺牲 | 保证 |
| 案例 | Seata AT/TCC | 消息队列 |
| 适用场景 | 资金、库存 | 日志、统计 |
Seata 分布式事务方案
使用 Seata AT 模式实现分布式事务:
# Seata 配置
# application.yml
seata:
application-id: psi-system
tx-service-group: psi_tx_group
config:
type: nacos
nacos:
server-addr: nacos.psi-system:8848
namespace: seata
group: SEATA_GROUP
registry:
type: nacos
nacos:
server-addr: nacos.psi-system:8848
namespace: seata
group: SEATA_GROUP
---
# 业务服务配置
@GlobalTransactional(timeoutMills = 30000, name = "psi-purchase-order")
public PurchaseOrder createOrder(PurchaseOrderDTO orderDTO) {
// 1. 创建采购订单
PurchaseOrder order = new PurchaseOrder();
order.setOrderNo(generateOrderNo());
order.setSupplierId(orderDTO.getSupplierId());
order.setTotalAmount(orderDTO.getTotalAmount());
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
// 2. 扣减供应商可用额度(远程调用)
supplierFeignClient.deductQuota(
orderDTO.getSupplierId(),
orderDTO.getTotalAmount()
);
// 3. 生成采购明细并扣减库存
for (PurchaseItemDTO item : orderDTO.getItems()) {
// 库存服务 TCC 预留
inventoryService.reserveStock(
item.getProductId(),
item.getQuantity()
);
}
// 4. 创建付款申请(财务服务)
paymentFeignClient.createPayment(
PaymentRequest.builder()
.orderId(order.getId())
.amount(order.getTotalAmount())
.paymentType(PaymentType.PURCHASE)
.build()
);
return order;
}
---
# 库存服务 TCC 实现
@LocalTCC
public interface InventoryTccService {
@TwoPhaseBusinessAction(
name = "reserveStock",
commitMethod = "confirm",
rollbackMethod = "cancel"
)
void reserveStock(
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "quantity") BigDecimal quantity
);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
@Service
public class InventoryTccServiceImpl implements InventoryTccService {
@Override
public void reserveStock(Long productId, BigDecimal quantity) {
// 预留库存(冻结)
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setReservedQuantity(
inventory.getReservedQuantity().add(quantity)
);
inventoryMapper.update(inventory);
// 记录预留日志
ReserveLog log = new ReserveLog();
log.setProductId(productId);
log.setQuantity(quantity);
log.setAction("reserve");
logMapper.insert(log);
}
@Override
public boolean confirm(BusinessActionContext context) {
Long productId = Long.parseLong(
context.getActionContext("productId").toString()
);
BigDecimal quantity = new BigDecimal(
context.getActionContext("quantity").toString()
);
// 正式扣减库存
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setQuantity(inventory.getQuantity().subtract(quantity));
inventory.setReservedQuantity(
inventory.getReservedQuantity().subtract(quantity)
);
inventoryMapper.update(inventory);
return true;
}
@Override
public boolean cancel(BusinessActionContext context) {
Long productId = Long.parseLong(
context.getActionContext("productId").toString()
);
BigDecimal quantity = new BigDecimal(
context.getActionContext("quantity").toString()
);
// 释放预留
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setReservedQuantity(
inventory.getReservedQuantity().subtract(quantity)
);
inventoryMapper.update(inventory);
return true;
}
}
消息队列最终一致性
使用消息事务保证数据一致性:
# 消息事务配置
@Configuration
public class KafkaTransactionConfig {
@Bean
public ProducerFactory producerFactory() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS, StringSerializer.class);
// 开启事务
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "psi-transaction");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
---
# 订单创建与消息发送
@Service
public class OrderService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private OrderMapper orderMapper;
public void createOrder(Order order) {
// 1. 本地事务:创建订单
orderMapper.insert(order);
// 2. 发送消息(本地消息表方案)
try {
saveMessageToLocalTable(order);
kafkaTemplate.send("order-created", order.getId().toString(), JSON.toJSONString(order));
} catch (Exception e) {
// 消息发送失败,记录重试
updateMessageStatus(order.getId(), MessageStatus.FAILED);
}
}
}
---
# 消息消费者 - 库存扣减
@Component
public class OrderConsumer {
@Autowired
private InventoryService inventoryService;
@KafkaListener(topics = "order-created", groupId = "psi-inventory")
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void consumeOrderCreated(ConsumerRecord record) {
Order order = JSON.parseObject(record.value(), Order.class);
// 幂等检查
if (!checkMessageProcessed(order.getId())) {
log.warn("消息已处理: {}", order.getId());
return;
}
try {
// 扣减库存
for (OrderItem item : order.getItems()) {
inventoryService.deductStock(
item.getProductId(),
item.getQuantity()
);
}
// 标记消息已处理
markMessageProcessed(order.getId());
} catch (Exception e) {
log.error("库存扣减失败: {}", order.getId(), e);
throw new RuntimeException("库存扣减失败", e);
}
}
}
---
# 库存回滚消息处理
@Component
public class InventoryRollbackConsumer {
@Autowired
private InventoryService inventoryService;
@KafkaListener(topics = "order-cancelled", groupId = "psi-inventory")
public void consumeOrderCancelled(ConsumerRecord record) {
Long orderId = Long.parseLong(record.key());
// 查询订单关联的库存预留
List reserves = inventoryService.getReservesByOrderId(orderId);
// 回滚库存
for (InventoryReserve reserve : reserves) {
inventoryService.rollbackStock(
reserve.getProductId(),
reserve.getQuantity()
);
}
}
}
分布式锁实现
使用 Redis 分布式锁保证并发安全:
# 分布式锁工具类
@Component
public class DistributedLock {
@Autowired
private RedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
private static final DefaultRedisScript UNLOCK_SCRIPT = new DefaultRedisScript<>(
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
Long.class
);
// 获取锁
public boolean tryLock(String key, String value, long expireTime, long waitTime, long interval) {
String lockKey = LOCK_PREFIX + key;
long endTime = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < endTime) {
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, value, Duration.ofMillis(expireTime));
if (Boolean.TRUE.equals(success)) {
return true;
}
// 等待重试
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
// 释放锁
public boolean unlock(String key, String value) {
String lockKey = LOCK_PREFIX + key;
Long result = redisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(lockKey),
value
);
return result != null && result > 0;
}
// 使用示例:库存扣减
public boolean deductStock(Long productId, BigDecimal quantity) {
String lockKey = "stock:" + productId;
String lockValue = UUID.randomUUID().toString();
try {
// 获取锁
if (!tryLock(lockKey, lockValue, 5000, 3000, 100)) {
throw new BusinessException("库存操作繁忙,请稍后重试");
}
// 查询当前库存
Inventory inventory = inventoryMapper.selectByProductId(productId);
// 检查库存是否充足
BigDecimal available = inventory.getQuantity()
.subtract(inventory.getReservedQuantity());
if (available.compareTo(quantity) < 0) {
throw new BusinessException("库存不足");
}
// 扣减库存
inventory.setQuantity(inventory.getQuantity().subtract(quantity));
inventoryMapper.update(inventory);
// 记录库存流水
StockFlow flow = new StockFlow();
flow.setProductId(productId);
flow.setChangeQty(quantity.negate());
flow.setType(StockChangeType.DEDUCT);
stockFlowMapper.insert(flow);
return true;
} finally {
// 释放锁
unlock(lockKey, lockValue);
}
}
}
---
# 防重放机制
@Component
public class IdempotenceChecker {
@Autowired
private RedisTemplate redisTemplate;
private static final String IDEMPOTENCE_PREFIX = "idemp:";
private static final Duration EXPIRE_TIME = Duration.ofHours(24);
// 检查并标记
public boolean checkAndMark(String businessType, String bizId) {
String key = IDEMPOTENCE_PREFIX + businessType + ":" + bizId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", EXPIRE_TIME);
return Boolean.TRUE.equals(success);
}
}
数据一致性监控
分布式事务监控和告警:
# 事务监控服务
@Service
public class TransactionMonitor {
@Autowired
private TransactionLogMapper transactionLogMapper;
@Scheduled(fixedDelay = 60000)
public void checkPendingTransactions() {
// 查询超时未完成的事务
List pendingTransactions = transactionLogMapper
.findPendingTransactions(30 * 60 * 1000); // 30分钟
for (TransactionLog transaction : pendingTransactions) {
// 告警通知
sendAlert(transaction);
// 尝试自动处理
handleTransaction(transaction);
}
}
// 处理异常事务
private void handleTransaction(TransactionLog transaction) {
switch (transaction.getStatus()) {
case TIMEOUT:
// 查询各服务状态,决定提交或回滚
Map branches = transaction.getBranchTransactions();
boolean canCommit = checkBranchesCanCommit(branches);
if (canCommit) {
seataClient.commit(transaction.getXid());
} else {
seataClient.rollback(transaction.getXid());
}
break;
case ROLLBACK_FAIL:
// 人工干预
notifyManualIntervention(transaction);
break;
}
}
// 每日事务报告
@Scheduled(cron = "0 0 2 * * ?")
public void generateDailyReport() {
List todayLogs = transactionLogMapper
.findByDate(LocalDate.now());
int total = todayLogs.size();
int success = todayLogs.stream()
.filter(t -> "committed".equals(t.getStatus()))
.mapToInt(t -> 1)
.sum();
int failed = todayLogs.stream()
.filter(t -> "failed".equals(t.getStatus()))
.mapToInt(t -> 1)
.sum();
// 发送日报
sendDailyReport(total, success, failed);
}
}
总结
PSI 进销存系统通过多种技术手段保证分布式数据一致性:
- Seata AT 模式:强一致性,适用于库存扣减等核心业务
- TCC 模式:灵活控制,适用于库存预留/确认场景
- 消息队列:最终一致性,适用于状态同步、日志记录
- 分布式锁:单机并发控制,防止库存超卖
- 幂等设计:防止重复扣减,保证数据正确
- 监控告警:及时发现和处理异常事务
合理的事务方案选择需要根据业务场景权衡一致性和性能。