From 92f6a161063470aeaa0f88d7909ae93170686538 Mon Sep 17 00:00:00 2001 From: dfdg <2710245601@qq.com> Date: Thu, 30 Oct 2025 11:05:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=E6=8E=A5=E9=80=86=E5=8F=98=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tcpfuwu/constant/ModbusConstant.java | 44 +++ .../dromara/tcpfuwu/domain/DeviceCache.java | 16 + .../tcpfuwu/domain/ModbusVariable.java | 18 + .../tcpfuwu/handler/DeviceHandler.java | 340 ++++++++++++++++++ .../tcpfuwu/server/UnifiedTcpServer.java | 170 +++++++++ .../tcpfuwu/starter/TcpServersStarter.java | 45 +++ .../org/dromara/tcpfuwu/util/ByteUtils.java | 45 +++ 7 files changed, 678 insertions(+) create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/UnifiedTcpServer.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java create mode 100644 ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ByteUtils.java diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java new file mode 100644 index 0000000..eed0502 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java @@ -0,0 +1,44 @@ +package org.dromara.tcpfuwu.constant; + +/** + * Modbus TCP协议常量 + */ +public class ModbusConstant { + // Modbus功能码:读取输入寄存器(0x04) + public static final byte FUNC_READ_INPUT_REG = 0x04; + // CRC16校验表(Modbus RTU标准) + public static final Integer[] CRC16_TABLE = { + 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, + 0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, + 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, + 0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, + 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, + 0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, + 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, + 0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, + 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, + 0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, + 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, + 0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, + 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, + 0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, + 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, + 0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, + 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, + 0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, + 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, + 0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, + 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, + 0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, + 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, + 0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, + 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, + 0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, + 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, + 0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, + 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, + 0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, + 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, + 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 + }; +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java new file mode 100644 index 0000000..d7c4a08 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java @@ -0,0 +1,16 @@ +package org.dromara.tcpfuwu.domain; + +import lombok.Data; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Data +public class DeviceCache { + private String snCode; // 设备SN码 + private long createTime; // 连接创建时间(毫秒) + private long lastHeartbeatTime; // 最后心跳时间(毫秒) + private boolean isExpired; // 是否过期 + // 变量值缓存:key=变量名,value=解析后的值(带倍率) + private Map variableValues = new ConcurrentHashMap<>(); +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java new file mode 100644 index 0000000..ff85f43 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java @@ -0,0 +1,18 @@ +package org.dromara.tcpfuwu.domain; + +import lombok.Data; + +@Data +public class ModbusVariable { + private Long id; + private String snCode; // 设备SN码 + private Integer slaveId; // 从机地址 + private Integer funcCode; // 功能码 + private Integer startRegAddr; // 起始寄存器地址 + private Integer regQuantity; // 寄存器数量 + private String variableName; // 变量名 + private String dataType; // 数据类型(S16/U16/S32/U32/FLOAT) + private Double multiplier; // 数据倍率 + private String unit; // 单位 + private Byte isEnabled; // 是否启用(1=启用) +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java new file mode 100644 index 0000000..af93a02 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java @@ -0,0 +1,340 @@ +package org.dromara.tcpfuwu.handler; + +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.dromara.shebei.domain.dto.OpsSbLiebiaoDto; +import org.dromara.shebei.domain.vo.OpsSbMbBianliangVo; +import org.dromara.shebei.service.IOpsSbLiebiaoService; +import org.dromara.tcpfuwu.constant.ModbusConstant; +import org.dromara.tcpfuwu.domain.DeviceCache; +import org.dromara.tcpfuwu.domain.ModbusVariable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 设备连接处理类:处理单设备的心跳和Modbus通信 + */ +//@Component +public class DeviceHandler { + // 配置参数 + private static final long HEARTBEAT_EXPIRE_MS = 30 * 1000; + private static final int HEARTBEAT_LEN = 20; + private static final int SN_CODE_LEN = 10; + private static final int MODBUS_REQUEST_CYCLE = 10; // 请求周期(秒) + + // 依赖注入 +// @Autowired + private final IOpsSbLiebiaoService sbLiebiaoService; + + // 设备信息 + private final Socket clientSocket; + private final String deviceAddr; + private final DeviceCache deviceCache; + private final Map globalCache; + + // 状态控制 + private final AtomicBoolean isRunning = new AtomicBoolean(true); + private InputStream in; + private OutputStream out; + private ScheduledExecutorService modbusScheduler; + + public DeviceHandler( + Socket clientSocket, + String deviceAddr, + DeviceCache deviceCache, + Map globalCache, + IOpsSbLiebiaoService sbLiebiaoService // 新增:手动传递Spring服务 + ) { + this.clientSocket = clientSocket; + this.deviceAddr = deviceAddr; + this.deviceCache = deviceCache; + this.globalCache = globalCache; + this.sbLiebiaoService = sbLiebiaoService; // 赋值 + } + + /** + * 处理设备通信的主方法 + */ + public void handle() { + try { + // 初始化IO流 + in = clientSocket.getInputStream(); + out = clientSocket.getOutputStream(); + + // 启动心跳接收线程 + startHeartbeatReceiver(); + + // 启动Modbus定时请求线程 + startModbusScheduler(); + + // 阻塞等待线程结束 + synchronized (this) { + wait(); + } + + } catch (Exception e) { + System.err.printf("【设备处理异常】地址:%s,原因:%s%n", deviceAddr, e.getMessage()); + } finally { + // 清理资源 + stop(); + } + } + + /** + * 启动心跳接收线程 + */ + private void startHeartbeatReceiver() { + new Thread(() -> { + try { + byte[] buffer = new byte[1024]; + while (isRunning.get()) { + int len = in.read(buffer); + if (len <= 0) { + System.out.printf("【心跳接收失败】设备:%s,连接断开%n", deviceAddr); + break; + } + + byte[] data = Arrays.copyOf(buffer, len); + if (isHeartbeatData(data)) { + // 解析SN码 + String snCode = new String(Arrays.copyOfRange(data, 0, SN_CODE_LEN)).trim(); + // 更新缓存 + deviceCache.setSnCode(snCode); + deviceCache.setLastHeartbeatTime(System.currentTimeMillis()); + System.out.printf("【心跳更新】设备:%s,SN:%s%n", deviceAddr, snCode); + } + } + } catch (Exception e) { + System.err.printf("【心跳线程异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } finally { + // 心跳线程结束,标记设备离线 + isRunning.set(false); + synchronized (DeviceHandler.this) { + DeviceHandler.this.notify(); + } + } + }, "heartbeat-" + deviceAddr).start(); + } + + /** + * 启动Modbus定时请求调度器 + */ + private void startModbusScheduler() { + modbusScheduler = Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "modbus-" + deviceAddr) + ); + + modbusScheduler.scheduleAtFixedRate(() -> { + if (!isRunning.get() || !clientSocket.isConnected() || clientSocket.isClosed()) { + stop(); + return; + } + + // 未获取SN码则等待 + if (deviceCache.getSnCode() == null) { + System.out.printf("【等待心跳】设备:%s%n", deviceAddr); + return; + } + + // 查询变量并处理 + List variables = queryVariables(deviceCache.getSnCode()); + if (variables.isEmpty()) { + System.out.printf("【无变量配置】SN:%s%n", deviceCache.getSnCode()); + return; + } + + // 遍历变量发送请求 + for (ModbusVariable var : variables) { + try { + handleVariable(var); + } catch (Exception e) { + System.err.printf("【变量处理失败】SN:%s,变量:%s,原因:%s%n", + deviceCache.getSnCode(), var.getVariableName(), e.getMessage()); + } + } + + }, 0, MODBUS_REQUEST_CYCLE, TimeUnit.SECONDS); + } + + /** + * 处理单个变量的请求与解析 + */ + private void handleVariable(ModbusVariable var) throws Exception { + // 生成请求帧 + byte[] request = generateModbusFrame(var); + System.out.printf("【发送请求】SN:%s,变量:%s,帧:%s%n", + deviceCache.getSnCode(), var.getVariableName(), bytesToHex(request)); + + // 发送请求 + out.write(request); + out.flush(); + + // 接收响应(超时3秒) + byte[] response = receiveResponse(3000); + if (response == null) { + throw new Exception("响应超时"); + } + + // 验证响应 + if (!validateResponse(response, var)) { + throw new Exception("响应验证失败"); + } + + // 解析数据 + Object value = parseValue(response, var); + System.out.printf("【解析成功】SN:%s,变量:%s,值:%s %s%n", + deviceCache.getSnCode(), var.getVariableName(), value, var.getUnit()); + + // 更新缓存 + deviceCache.getVariableValues().put(var.getVariableName(), value); + } + + /** + * 从数据库查询变量列表 + */ + private List queryVariables(String snCode) { + ArrayList modbusVariables = new ArrayList<>(); + OpsSbLiebiaoDto opsSbLiebiaoDto = sbLiebiaoService.getLiebiaoBianliangList(snCode); + if (!opsSbLiebiaoDto.getSbMbBianliangVos().isEmpty()) { + for (OpsSbMbBianliangVo v : opsSbLiebiaoDto.getSbMbBianliangVos()) { + ModbusVariable modbusVariable = new ModbusVariable(); + modbusVariable.setDataType(v.getShujvGeshi()); + modbusVariable.setVariableName(v.getBlName()); + modbusVariable.setUnit(v.getBlDanwei()); + modbusVariable.setSnCode(snCode); + modbusVariable.setSlaveId(Math.toIntExact(opsSbLiebiaoDto.getSlaveId())); + modbusVariable.setFuncCode(Integer.parseInt(v.getJicunqiGnm())); + modbusVariable.setStartRegAddr(Integer.parseInt(v.getJicunqiAdd())); + switch (v.getShujvGeshi()) { + case "S16": modbusVariable.setRegQuantity(1); break; + case "U16": modbusVariable.setRegQuantity(1); break; + case "S32": modbusVariable.setRegQuantity(2); break; + case "U32": modbusVariable.setRegQuantity(2); break; + } + modbusVariables.add(modbusVariable); + } + } + return modbusVariables; + } + + /** + * 停止所有资源 + */ + public void stop() { + isRunning.set(false); + + // 关闭Modbus调度器 + if (modbusScheduler != null) { + modbusScheduler.shutdown(); + } + + // 关闭Socket和流 + try { + if (in != null) in.close(); + if (out != null) out.close(); + if (clientSocket != null && !clientSocket.isClosed()) { + clientSocket.close(); + } + } catch (Exception e) { + System.err.printf("【资源关闭异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } + + // 从全局缓存移除 + globalCache.remove(deviceAddr); + System.out.printf("【设备下线】地址:%s%n", deviceAddr); + } + + // ------------------------------ 工具方法 ------------------------------ + private boolean isHeartbeatData(byte[] data) { + if (data.length != HEARTBEAT_LEN) return false; + for (byte b : data) { + if (b < '0' || b > '9') return false; + } + return true; + } + + private byte[] generateModbusFrame(ModbusVariable var) { + ByteBuffer buffer = ByteBuffer.allocate(6).order(ByteOrder.BIG_ENDIAN); + buffer.put(var.getSlaveId().byteValue()) + .put(var.getFuncCode().byteValue()) + .putShort(var.getStartRegAddr().shortValue()) + .putShort(var.getRegQuantity().shortValue()); + + byte[] body = buffer.array(); + byte[] crc = calculateCrc16(body); + + byte[] frame = new byte[body.length + crc.length]; + System.arraycopy(body, 0, frame, 0, body.length); + System.arraycopy(crc, 0, frame, body.length, crc.length); + return frame; + } + + private byte[] receiveResponse(int timeoutMs) throws Exception { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeoutMs) { + if (in.available() > 0) { + byte[] buffer = new byte[1024]; + int len = in.read(buffer); + return Arrays.copyOf(buffer, len); + } + Thread.sleep(10); + } + return null; + } + + private boolean validateResponse(byte[] response, ModbusVariable var) { + if (response.length < 5) return false; + if (response[0] != var.getSlaveId() || response[1] != var.getFuncCode()) return false; + if (response[2] != var.getRegQuantity() * 2) return false; + + byte[] body = Arrays.copyOf(response, response.length - 2); + byte[] receivedCrc = Arrays.copyOfRange(response, response.length - 2, response.length); + return Arrays.equals(receivedCrc, calculateCrc16(body)); + } + + private Object parseValue(byte[] response, ModbusVariable var) { + ByteBuffer buffer = ByteBuffer.wrap(response, 3, response[2]).order(ByteOrder.BIG_ENDIAN); + double rawValue; + + switch (var.getDataType().toUpperCase()) { + case "S16": rawValue = buffer.getShort(); break; + case "U16": rawValue = buffer.getShort() & 0xFFFF; break; + case "S32": rawValue = buffer.getInt(); break; + case "U32": rawValue = buffer.getLong() & 0xFFFFFFFFL; break; + case "FLOAT": rawValue = buffer.getFloat(); break; + default: throw new IllegalArgumentException("不支持的数据类型:" + var.getDataType()); + } + + return rawValue * var.getMultiplier(); + } + + private byte[] calculateCrc16(byte[] data) { + int crc = 0xFFFF; + for (byte b : data) { + crc = (crc >> 8) ^ ModbusConstant.CRC16_TABLE[(crc ^ (b & 0xFF)) & 0xFF]; + } + return new byte[]{(byte) (crc & 0xFF), (byte) ((crc >> 8) & 0xFF)}; + } + + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/UnifiedTcpServer.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/UnifiedTcpServer.java new file mode 100644 index 0000000..a639b9e --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/UnifiedTcpServer.java @@ -0,0 +1,170 @@ +package org.dromara.tcpfuwu.server; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +import org.dromara.shebei.service.IOpsSbLiebiaoService; +import org.dromara.tcpfuwu.domain.DeviceCache; +import org.dromara.tcpfuwu.handler.DeviceHandler; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + + +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.*; + +/** + * 统一TCP服务器(发送Modbus请求前必须验证心跳状态) + */ +@Component +@Slf4j +public class UnifiedTcpServer { + + @Value("${tcp.server.port:8888}") + private int tcpPort; + @Value("${tcp.server.heartbeat-timeout: 60000}") + private int heartbeatExpireMs; + + // 线程池:处理设备连接 + private ExecutorService clientExecutor; + // 服务器Socket + private ServerSocket serverSocket; + // 设备缓存 + private final Map deviceCache = new ConcurrentHashMap<>(); + // 缓存清理线程 + private ScheduledExecutorService cacheCleaner; + // 服务运行状态 + private volatile boolean isRunning = false; + + @Autowired + private IOpsSbLiebiaoService sbLiebiaoService; + + + /** + * 初始化方法:Spring容器启动后自动调用 + */ + @PostConstruct + public void start() { + try { + // 初始化服务器Socket + serverSocket = new ServerSocket(tcpPort); + // 初始化线程池(处理设备连接) + clientExecutor = Executors.newCachedThreadPool(r -> { + Thread thread = new Thread(r); + thread.setName("tcp-client-handler-" + thread.getId()); + thread.setDaemon(true); // 守护线程,随主线程退出 + return thread; + }); + // 初始化缓存清理线程 + initCacheCleaner(); + + isRunning = true; + System.out.printf("【TCP服务启动成功】监听端口:%d%n", tcpPort); + + // 启动接受连接的线程 + new Thread(this::acceptConnections, "tcp-acceptor").start(); + + } catch (Exception e) { + System.err.println("【TCP服务启动失败】" + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 接受设备连接的循环 + */ + private void acceptConnections() { + while (isRunning) { + try { + // 阻塞等待设备连接 + Socket clientSocket = serverSocket.accept(); + String deviceAddr = clientSocket.getInetAddress() + ":" + clientSocket.getPort(); + System.out.printf("%n【设备上线】地址:%s%n", deviceAddr); + + // 初始化设备缓存 + DeviceCache cache = new DeviceCache(); + cache.setCreateTime(System.currentTimeMillis()); + deviceCache.put(deviceAddr, cache); + + // 提交设备处理任务 + clientExecutor.submit(() -> + new DeviceHandler(clientSocket, deviceAddr, cache, deviceCache,sbLiebiaoService).handle() + ); + + } catch (Exception e) { + if (isRunning) { // 非关闭状态下的异常 + System.err.println("【连接接受异常】" + e.getMessage()); + } + } + } + } + + /** + * 初始化缓存清理线程 + */ + private void initCacheCleaner() { + cacheCleaner = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r); + thread.setName("cache-cleaner"); + thread.setDaemon(true); + return thread; + }); + + // 每20秒清理一次过期设备 + cacheCleaner.scheduleAtFixedRate(() -> { + long currentTime = System.currentTimeMillis(); + for (Map.Entry e : deviceCache.entrySet()) { + String deviceAddr = e.getKey(); + DeviceCache cache = e.getValue(); + // 判断是否过期:未收到过心跳 或 超过过期时间 + boolean isExpired = cache.getLastHeartbeatTime() == 0 + ? (currentTime - cache.getCreateTime() > heartbeatExpireMs) + : (currentTime - cache.getLastHeartbeatTime() > heartbeatExpireMs); + + if (isExpired && !cache.isExpired()) { + cache.setExpired(true); + deviceCache.remove(deviceAddr); + System.out.printf("【设备过期】设备地址:%s,已清理缓存%n", deviceAddr); + } + } + }, 0, 20, TimeUnit.SECONDS); + } + + /** + * 销毁方法:Spring容器关闭前自动调用 + */ + @PreDestroy + public void stop() { + isRunning = false; + System.out.println("【TCP服务开始关闭】"); + + // 关闭服务器Socket + try { + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } catch (Exception e) { + System.err.println("【关闭ServerSocket异常】" + e.getMessage()); + } + + // 关闭线程池 + if (clientExecutor != null) { + clientExecutor.shutdown(); + } + + // 关闭缓存清理线程 + if (cacheCleaner != null) { + cacheCleaner.shutdown(); + } + + // 清理设备缓存 + deviceCache.clear(); + + System.out.println("【TCP服务已关闭】"); + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java new file mode 100644 index 0000000..a74a028 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java @@ -0,0 +1,45 @@ +package org.dromara.tcpfuwu.starter; + + +import lombok.extern.slf4j.Slf4j; +import org.dromara.tcpfuwu.server.UnifiedTcpServer; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +/** + * Spring启动时自动初始化TCP服务器,关闭时释放资源 + */ +@Component +@Slf4j +public class TcpServersStarter implements CommandLineRunner { + + private final UnifiedTcpServer unifiedTcpServer; + + // 构造注入两个TCP服务器Bean + public TcpServersStarter( UnifiedTcpServer unifiedTcpServer) { + this.unifiedTcpServer = unifiedTcpServer; + } + + /** + * Spring Boot启动后执行(CommandLineRunner接口方法) + */ + @Override + public void run(String... args) throws Exception { + log.info("【TCP服务器启动器】开始初始化Modbus和逆变器心跳服务器..."); + // 启动两个服务器(独立线程,不阻塞Spring主线程) + unifiedTcpServer.start(); + log.info("【TCP服务器启动器】所有TCP服务器初始化完成"); + } + + /** + * Spring容器销毁前执行(释放资源) + */ + @PreDestroy + public void stopTcpServers() { + log.info("【TCP服务器启动器】开始关闭所有TCP服务器..."); + unifiedTcpServer.stop(); + log.info("【TCP服务器启动器】所有TCP服务器已关闭"); + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ByteUtils.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ByteUtils.java new file mode 100644 index 0000000..349544c --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ByteUtils.java @@ -0,0 +1,45 @@ +package org.dromara.tcpfuwu.util; + +import org.springframework.stereotype.Component; + +/** + * 字节数组处理工具类 + */ +@Component +public class ByteUtils { + + /** + * 字节数组转16进制字符串(空格分隔) + */ + public String bytesToHex(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } + + /** + * 字节数组转ASCII字符串(不可打印字符用[0xXX]表示) + */ + public String bytesToAsciiString(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + // 可打印ASCII范围:32(空格)~126(~) + if (b >= 32 && b <= 126) { + sb.append((char) b); + } else { + sb.append("[0x").append(String.format("%02X", b)).append("]"); + } + } + return sb.toString(); + } + + +}