个人编制软件展示

PSI - Purchase Sale Inventory 进销存软件

PSI进销存系统分布式事务与数据一致性

分布式事务概述

PSI 进销存系统涉及采购、销售、库存、财务等多个业务模块,业务操作需要保证数据的一致性。随着系统架构向微服务演进,分布式事务成为必须面对的问题。本文介绍 PSI 系统中分布式事务的处理方案,包括 Seata 框架、消息队列最终一致性等实现。

CAP 理论与事务选型

特性 CP (强一致) AP (可用性)
一致性 强一致 最终一致
可用性 牺牲 保证
案例 Seata AT/TCC 消息队列
适用场景 资金、库存 日志、统计

Seata 分布式事务方案

使用 Seata AT 模式实现分布式事务:

# Seata 配置
# application.yml
seata:
  application-id: psi-system
  tx-service-group: psi_tx_group
  config:
    type: nacos
    nacos:
      server-addr: nacos.psi-system:8848
      namespace: seata
      group: SEATA_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: nacos.psi-system:8848
      namespace: seata
      group: SEATA_GROUP

---
# 业务服务配置
@GlobalTransactional(timeoutMills = 30000, name = "psi-purchase-order")
public PurchaseOrder createOrder(PurchaseOrderDTO orderDTO) {
    // 1. 创建采购订单
    PurchaseOrder order = new PurchaseOrder();
    order.setOrderNo(generateOrderNo());
    order.setSupplierId(orderDTO.getSupplierId());
    order.setTotalAmount(orderDTO.getTotalAmount());
    order.setStatus(OrderStatus.PENDING);
    orderMapper.insert(order);

    // 2. 扣减供应商可用额度(远程调用)
    supplierFeignClient.deductQuota(
        orderDTO.getSupplierId(),
        orderDTO.getTotalAmount()
    );

    // 3. 生成采购明细并扣减库存
    for (PurchaseItemDTO item : orderDTO.getItems()) {
        // 库存服务 TCC 预留
        inventoryService.reserveStock(
            item.getProductId(),
            item.getQuantity()
        );
    }

    // 4. 创建付款申请(财务服务)
    paymentFeignClient.createPayment(
        PaymentRequest.builder()
            .orderId(order.getId())
            .amount(order.getTotalAmount())
            .paymentType(PaymentType.PURCHASE)
            .build()
    );

    return order;
}

---
# 库存服务 TCC 实现
@LocalTCC
public interface InventoryTccService {

    @TwoPhaseBusinessAction(
        name = "reserveStock",
        commitMethod = "confirm",
        rollbackMethod = "cancel"
    )
    void reserveStock(
        @BusinessActionContextParameter(paramName = "productId") Long productId,
        @BusinessActionContextParameter(paramName = "quantity") BigDecimal quantity
    );

    boolean confirm(BusinessActionContext context);

    boolean cancel(BusinessActionContext context);
}

@Service
public class InventoryTccServiceImpl implements InventoryTccService {

    @Override
    public void reserveStock(Long productId, BigDecimal quantity) {
        // 预留库存(冻结)
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        inventory.setReservedQuantity(
            inventory.getReservedQuantity().add(quantity)
        );
        inventoryMapper.update(inventory);

        // 记录预留日志
        ReserveLog log = new ReserveLog();
        log.setProductId(productId);
        log.setQuantity(quantity);
        log.setAction("reserve");
        logMapper.insert(log);
    }

    @Override
    public boolean confirm(BusinessActionContext context) {
        Long productId = Long.parseLong(
            context.getActionContext("productId").toString()
        );
        BigDecimal quantity = new BigDecimal(
            context.getActionContext("quantity").toString()
        );

        // 正式扣减库存
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        inventory.setQuantity(inventory.getQuantity().subtract(quantity));
        inventory.setReservedQuantity(
            inventory.getReservedQuantity().subtract(quantity)
        );
        inventoryMapper.update(inventory);

        return true;
    }

    @Override
    public boolean cancel(BusinessActionContext context) {
        Long productId = Long.parseLong(
            context.getActionContext("productId").toString()
        );
        BigDecimal quantity = new BigDecimal(
            context.getActionContext("quantity").toString()
        );

        // 释放预留
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        inventory.setReservedQuantity(
            inventory.getReservedQuantity().subtract(quantity)
        );
        inventoryMapper.update(inventory);

        return true;
    }
}

消息队列最终一致性

使用消息事务保证数据一致性:

# 消息事务配置
@Configuration
public class KafkaTransactionConfig {

    @Bean
    public ProducerFactory producerFactory() {
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS, StringSerializer.class);
        // 开启事务
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "psi-transaction");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

---
# 订单创建与消息发送
@Service
public class OrderService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private OrderMapper orderMapper;

    public void createOrder(Order order) {
        // 1. 本地事务:创建订单
        orderMapper.insert(order);

        // 2. 发送消息(本地消息表方案)
        try {
            saveMessageToLocalTable(order);
            kafkaTemplate.send("order-created", order.getId().toString(), JSON.toJSONString(order));
        } catch (Exception e) {
            // 消息发送失败,记录重试
            updateMessageStatus(order.getId(), MessageStatus.FAILED);
        }
    }
}

---
# 消息消费者 - 库存扣减
@Component
public class OrderConsumer {

    @Autowired
    private InventoryService inventoryService;

    @KafkaListener(topics = "order-created", groupId = "psi-inventory")
    @RetryableTopic(
        attempts = "3",
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void consumeOrderCreated(ConsumerRecord record) {
        Order order = JSON.parseObject(record.value(), Order.class);

        // 幂等检查
        if (!checkMessageProcessed(order.getId())) {
            log.warn("消息已处理: {}", order.getId());
            return;
        }

        try {
            // 扣减库存
            for (OrderItem item : order.getItems()) {
                inventoryService.deductStock(
                    item.getProductId(),
                    item.getQuantity()
                );
            }

            // 标记消息已处理
            markMessageProcessed(order.getId());

        } catch (Exception e) {
            log.error("库存扣减失败: {}", order.getId(), e);
            throw new RuntimeException("库存扣减失败", e);
        }
    }
}

---
# 库存回滚消息处理
@Component
public class InventoryRollbackConsumer {

    @Autowired
    private InventoryService inventoryService;

    @KafkaListener(topics = "order-cancelled", groupId = "psi-inventory")
    public void consumeOrderCancelled(ConsumerRecord record) {
        Long orderId = Long.parseLong(record.key());

        // 查询订单关联的库存预留
        List reserves = inventoryService.getReservesByOrderId(orderId);

        // 回滚库存
        for (InventoryReserve reserve : reserves) {
            inventoryService.rollbackStock(
                reserve.getProductId(),
                reserve.getQuantity()
            );
        }
    }
}

分布式锁实现

使用 Redis 分布式锁保证并发安全:

# 分布式锁工具类
@Component
public class DistributedLock {

    @Autowired
    private RedisTemplate redisTemplate;

    private static final String LOCK_PREFIX = "lock:";
    private static final DefaultRedisScript UNLOCK_SCRIPT = new DefaultRedisScript<>(
        "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end",
        Long.class
    );

    // 获取锁
    public boolean tryLock(String key, String value, long expireTime, long waitTime, long interval) {
        String lockKey = LOCK_PREFIX + key;
        long endTime = System.currentTimeMillis() + waitTime;

        while (System.currentTimeMillis() < endTime) {
            Boolean success = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, value, Duration.ofMillis(expireTime));

            if (Boolean.TRUE.equals(success)) {
                return true;
            }

            // 等待重试
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }

        return false;
    }

    // 释放锁
    public boolean unlock(String key, String value) {
        String lockKey = LOCK_PREFIX + key;
        Long result = redisTemplate.execute(
            UNLOCK_SCRIPT,
            Collections.singletonList(lockKey),
            value
        );
        return result != null && result > 0;
    }

    // 使用示例:库存扣减
    public boolean deductStock(Long productId, BigDecimal quantity) {
        String lockKey = "stock:" + productId;
        String lockValue = UUID.randomUUID().toString();

        try {
            // 获取锁
            if (!tryLock(lockKey, lockValue, 5000, 3000, 100)) {
                throw new BusinessException("库存操作繁忙,请稍后重试");
            }

            // 查询当前库存
            Inventory inventory = inventoryMapper.selectByProductId(productId);

            // 检查库存是否充足
            BigDecimal available = inventory.getQuantity()
                .subtract(inventory.getReservedQuantity());

            if (available.compareTo(quantity) < 0) {
                throw new BusinessException("库存不足");
            }

            // 扣减库存
            inventory.setQuantity(inventory.getQuantity().subtract(quantity));
            inventoryMapper.update(inventory);

            // 记录库存流水
            StockFlow flow = new StockFlow();
            flow.setProductId(productId);
            flow.setChangeQty(quantity.negate());
            flow.setType(StockChangeType.DEDUCT);
            stockFlowMapper.insert(flow);

            return true;

        } finally {
            // 释放锁
            unlock(lockKey, lockValue);
        }
    }
}

---
# 防重放机制
@Component
public class IdempotenceChecker {

    @Autowired
    private RedisTemplate redisTemplate;

    private static final String IDEMPOTENCE_PREFIX = "idemp:";
    private static final Duration EXPIRE_TIME = Duration.ofHours(24);

    // 检查并标记
    public boolean checkAndMark(String businessType, String bizId) {
        String key = IDEMPOTENCE_PREFIX + businessType + ":" + bizId;

        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", EXPIRE_TIME);

        return Boolean.TRUE.equals(success);
    }
}

数据一致性监控

分布式事务监控和告警:

# 事务监控服务
@Service
public class TransactionMonitor {

    @Autowired
    private TransactionLogMapper transactionLogMapper;

    @Scheduled(fixedDelay = 60000)
    public void checkPendingTransactions() {
        // 查询超时未完成的事务
        List pendingTransactions = transactionLogMapper
            .findPendingTransactions(30 * 60 * 1000); // 30分钟

        for (TransactionLog transaction : pendingTransactions) {
            // 告警通知
            sendAlert(transaction);

            // 尝试自动处理
            handleTransaction(transaction);
        }
    }

    // 处理异常事务
    private void handleTransaction(TransactionLog transaction) {
        switch (transaction.getStatus()) {
            case TIMEOUT:
                // 查询各服务状态,决定提交或回滚
                Map branches = transaction.getBranchTransactions();
                boolean canCommit = checkBranchesCanCommit(branches);

                if (canCommit) {
                    seataClient.commit(transaction.getXid());
                } else {
                    seataClient.rollback(transaction.getXid());
                }
                break;

            case ROLLBACK_FAIL:
                // 人工干预
                notifyManualIntervention(transaction);
                break;
        }
    }

    // 每日事务报告
    @Scheduled(cron = "0 0 2 * * ?")
    public void generateDailyReport() {
        List todayLogs = transactionLogMapper
            .findByDate(LocalDate.now());

        int total = todayLogs.size();
        int success = todayLogs.stream()
            .filter(t -> "committed".equals(t.getStatus()))
            .mapToInt(t -> 1)
            .sum();
        int failed = todayLogs.stream()
            .filter(t -> "failed".equals(t.getStatus()))
            .mapToInt(t -> 1)
            .sum();

        // 发送日报
        sendDailyReport(total, success, failed);
    }
}

总结

PSI 进销存系统通过多种技术手段保证分布式数据一致性:

合理的事务方案选择需要根据业务场景权衡一致性和性能。

← 下一篇:PSI进销存系统云原生架构与容器化部署