PSI 进销存系统区块链溯源与数据安全
区块链溯源概述
随着消费者对商品来源和质量安全的关注度不断提升,商品溯源已成为供应链管理的核心需求。通过区块链技术,可以实现商品从生产、流通到消费的全流程追溯,确保数据的真实性和不可篡改性。本文介绍 PSI 系统中区块链溯源方案的实现。
系统架构设计
区块链溯源系统架构:
| 层次 | 功能描述 | 技术选型 |
|---|---|---|
| 感知层 | 扫码设备、RFID、IoT传感器 | 二维码、RFID |
| 接入层 | 数据采集、协议转换 | Koa.js API |
| 业务层 | 溯源业务、区块链交互 | Node.js |
| 区块链层 | 数据存证、分布式账本 | Hyperledger Fabric |
| 展示层 | 溯源查询、数据可视化 | Web、小程序 |
核心功能实现
1. 溯源数据上链
将商品流转数据写入区块链:
// 溯源数据上链服务
class TraceabilityService {
constructor(fabricClient, db) {
this.client = fabricClient;
this.db = db;
this.chaincodeName = 'traceability';
}
// 记录商品流转信息
async recordProductFlow(flowData) {
const { productId, action, operator, location, data } = flowData;
// 1. 准备上链数据
const flowRecord = {
productId,
action,
operator,
location,
timestamp: Date.now(),
dataHash: this.calculateHash(data),
previousHash: await this.getLastHash(productId)
};
// 2. 调用智能合约上链
const result = await this.invokeChaincode('recordFlow', {
productId,
flow: JSON.stringify(flowRecord)
});
// 3. 本地存储原始数据(链下)
await this.db.collection('product_flows').insertOne({
...flowData,
txId: result.txId,
blockNumber: result.blockNumber,
chaincode: this.chaincodeName
});
return result;
}
// 采购入库上链
async recordPurchaseInbound(purchaseData) {
return await this.recordProductFlow({
productId: purchaseData.productId,
action: 'PURCHASE_INBOUND',
operator: purchaseData.operator,
location: purchaseData.warehouse,
data: {
supplier: purchaseData.supplier,
quantity: purchaseData.quantity,
price: purchaseData.price,
batchNo: purchaseData.batchNo,
productionDate: purchaseData.productionDate,
expiryDate: purchaseData.expiryDate,
qualityCert: purchaseData.qualityCert
}
});
}
// 销售出库上链
async recordSalesOutbound(salesData) {
return await this.recordProductFlow({
productId: salesData.productId,
action: 'SALES_OUTBOUND',
operator: salesData.operator,
location: salesData.warehouse,
data: {
customer: salesData.customer,
quantity: salesData.quantity,
orderNo: salesData.orderNo,
destination: salesData.destination
}
});
}
// 库存调拨上链
async recordTransfer(transferData) {
return await this.recordProductFlow({
productId: transferData.productId,
action: 'WAREHOUSE_TRANSFER',
operator: transferData.operator,
location: `${transferData.fromWarehouse} -> ${transferData.toWarehouse}`,
data: {
quantity: transferData.quantity,
fromWarehouse: transferData.fromWarehouse,
toWarehouse: transferData.toWarehouse,
reason: transferData.reason
}
});
}
// 质量检验上链
async recordQualityCheck(checkData) {
return await this.recordProductFlow({
productId: checkData.productId,
action: 'QUALITY_CHECK',
operator: checkData.operator,
location: checkData.labLocation,
data: {
batchNo: checkData.batchNo,
checkResult: checkData.result,
checker: checkData.checker,
checkDate: checkData.checkDate,
certificates: checkData.certificates,
notes: checkData.notes
}
});
}
// 计算数据哈希
calculateHash(data) {
const crypto = require('crypto');
return crypto.createHash('sha256')
.update(JSON.stringify(data))
.digest('hex');
}
// 获取商品最后一条记录的哈希
async getLastHash(productId) {
const lastRecord = await this.db.collection('product_flows')
.find({ productId })
.sort({ timestamp: -1 })
.limit(1)
.toArray();
return lastRecord.length > 0 ? this.calculateHash(lastRecord[0].data) : '0';
}
}
2. 溯源查询验证
查询商品流转记录和验证数据完整性:
// 溯源查询服务
class TraceabilityQueryService {
constructor(fabricClient, db) {
this.client = fabricClient;
this.db = db;
}
// 查询商品完整流转记录
async queryProductTrace(productId) {
// 1. 从区块链获取上链记录
const chainRecords = await this.queryChaincode('queryProductHistory', {
productId
});
// 2. 获取链下原始数据
const localRecords = await this.db.collection('product_flows')
.find({ productId })
.sort({ timestamp: 1 })
.toArray();
// 3. 整合展示
const trace = localRecords.map((record, index) => ({
step: index + 1,
action: record.action,
actionName: this.getActionName(record.action),
operator: record.operator,
location: record.location,
timestamp: record.timestamp,
date: new Date(record.timestamp).toLocaleString(),
data: record.data,
txId: record.txId,
// 验证数据是否被篡改
isValid: this.verifyRecord(record, chainRecords[index])
}));
// 4. 完整性统计
const integrity = {
totalSteps: trace.length,
validSteps: trace.filter(t => t.isValid).length,
isComplete: trace.every(t => t.isValid)
};
return { trace, integrity };
}
// 验证单条记录
verifyRecord(localRecord, chainRecord) {
if (!chainRecord) return false;
const expectedHash = this.calculateHash(localRecord.data);
return expectedHash === chainRecord.dataHash;
}
// 简化版查询(用于消费者扫码)
async querySimpleTrace(productId) {
const records = await this.db.collection('product_flows')
.find({ productId })
.sort({ timestamp: 1 })
.toArray();
return records.map(r => ({
action: this.getActionName(r.action),
location: r.location,
date: new Date(r.timestamp).toLocaleDateString(),
operator: r.operator
}));
}
// 验证商品真伪
async verifyProduct(productId, expectedBatchNo) {
const firstRecord = await this.db.collection('product_flows')
.find({ productId, action: 'PURCHASE_INBOUND' })
.sort({ timestamp: 1 })
.limit(1)
.toArray();
if (firstRecord.length === 0) {
return { valid: false, message: '未找到入库记录' };
}
const batchNo = firstRecord[0].data.batchNo;
if (batchNo !== expectedBatchNo) {
return { valid: false, message: '批次号不匹配' };
}
// 验证所有记录完整性
const chainCount = await this.queryChaincode('getRecordCount', { productId });
const localCount = await this.db.collection('product_flows')
.countDocuments({ productId });
return {
valid: chainCount === localCount,
message: chainCount === localCount ? '商品正品' : '数据不完整',
productionInfo: firstRecord[0].data
};
}
// 获取操作名称
getActionName(action) {
const names = {
'PURCHASE_INBOUND': '采购入库',
'SALES_OUTBOUND': '销售出库',
'WAREHOUSE_TRANSFER': '仓库调拨',
'QUALITY_CHECK': '质量检验',
'PRODUCTION': '生产加工',
'STORAGE': '仓储保管'
};
return names[action] || action;
}
calculateHash(data) {
const crypto = require('crypto');
return crypto.createHash('sha256')
.update(JSON.stringify(data))
.digest('hex');
}
}
3. 数据安全与权限控制
保障溯源数据的安全性和隐私性:
// 数据安全服务
class TraceabilitySecurity {
constructor() {
this.encryptionKey = process.env.ENCRYPTION_KEY;
}
// 数据加密存储
encrypt(data) {
const crypto = require('crypto');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(
'aes-256-cbc',
Buffer.from(this.encryptionKey, 'hex'),
iv
);
let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex');
encrypted += cipher.final('hex');
return {
iv: iv.toString('hex'),
data: encrypted
};
}
// 数据解密
decrypt(encryptedData) {
const crypto = require('crypto');
const decipher = crypto.createDecipheriv(
'aes-256-cbc',
Buffer.from(this.encryptionKey, 'hex'),
Buffer.from(encryptedData.iv, 'hex')
);
let decrypted = decipher.update(encryptedData.data, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return JSON.parse(decrypted);
}
// 敏感字段脱敏
sanitize(record, role) {
const sensitiveFields = ['price', 'cost', 'customerPhone', 'supplierContact'];
const publicFields = ['price', 'customer', 'supplier'];
const adminFields = ['price', 'cost', 'customerPhone', 'supplierContact', 'operator'];
let allowedFields;
switch (role) {
case 'public':
allowedFields = publicFields;
break;
case 'admin':
allowedFields = adminFields;
break;
default:
allowedFields = publicFields;
}
const sanitized = { ...record };
for (const field of sensitiveFields) {
if (!allowedFields.includes(field) && field in sanitized) {
sanitized[field] = this.mask(sanitized[field]);
}
}
return sanitized;
}
// 字段脱敏
mask(value) {
if (!value) return '';
const str = String(value);
if (str.length <= 4) return '****';
return str.substring(0, 2) + '****' + str.substring(str.length - 2);
}
// 数字签名
sign(data, privateKey) {
const crypto = require('crypto');
const sign = crypto.createSign('RSA-SHA256');
sign.update(JSON.stringify(data));
return sign.sign(privateKey, 'base64');
}
// 签名验证
verify(data, signature, publicKey) {
const crypto = require('crypto');
const verify = crypto.createVerify('RSA-SHA256');
verify.update(JSON.stringify(data));
return verify.verify(publicKey, signature, 'base64');
}
// 访问控制
async checkAccess(userId, productId, action) {
// 查询用户权限
const user = await this.getUser(userId);
// 公开查询
if (action === 'query') {
return true;
}
// 写入操作需要权限
const permissions = {
'PURCHASE_INBOUND': ['admin', 'purchaser'],
'SALES_OUTBOUND': ['admin', 'sales'],
'WAREHOUSE_TRANSFER': ['admin', 'warehouse'],
'QUALITY_CHECK': ['admin', 'qc']
};
const allowedRoles = permissions[action] || ['admin'];
return allowedRoles.includes(user.role);
}
}
4. 消费者溯源界面
扫码溯源的前端展示:
// 溯源查询 API
router.get('/api/traceability/:productId', async (ctx) => {
const { productId } = ctx.params;
const { batchNo } = ctx.query;
const service = new TraceabilityQueryService();
// 验证商品真伪
const verification = await service.verifyProduct(productId, batchNo);
if (!verification.valid) {
ctx.status = 400;
ctx.body = {
success: false,
message: verification.message
};
return;
}
// 获取溯源信息
const trace = await service.querySimpleTrace(productId);
ctx.body = {
success: true,
data: {
productId,
isAuthentic: true,
productionInfo: verification.productionInfo,
trace: trace
}
};
});
// 管理员完整溯源查询
router.get('/api/admin/traceability/:productId', async (ctx) => {
const { productId } = ctx.params;
// 权限检查
const user = ctx.state.user;
if (!['admin', 'manager'].includes(user.role)) {
ctx.status = 403;
ctx.body = { success: false, message: '权限不足' };
return;
}
const service = new TraceabilityQueryService();
const result = await service.queryProductTrace(productId);
ctx.body = {
success: true,
data: result
};
});
溯源应用场景
区块链溯源的典型应用场景:
- 食品安全溯源:记录食品从原料、生产、加工、仓储、物流到销售的全程信息
- 药品追溯:实现药品全流程可追溯,满足药品监管要求
- 奢侈品防伪:为奢侈品提供唯一的数字身份证明,防止假冒
- 农产品溯源:记录农产品产地、种植、施肥、收割等信息
- 工业品质量追溯:记录工业产品的生产批次、质量检测结果
实施建议
区块链溯源系统实施要点:
| 阶段 | 关键任务 | 注意事项 |
|---|---|---|
| 规划阶段 | 确定溯源范围、上链数据粒度 | 平衡成本与效果 |
| 技术选型 | 选择联盟链或私有链方案 | 考虑性能与去中心化程度 |
| 系统集成 | 与现有进销存系统对接 | 最小化业务影响 |
| 运营推广 | 消费者扫码推广、员工培训 | 持续优化体验 |
总结
区块链溯源为 PSI 进销存系统提供了强大的商品全流程追溯能力,通过将关键业务数据上链,可以有效解决商品真伪验证、质量追溯等难题。在实施过程中,需要根据业务需求合理规划上链范围,平衡数据透明度和商业隐私,确保系统既安全又实用。