[ https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974 ]
gongping.zhu edited comment on ARTEMIS-3913 at 8/2/22 12:45 AM: ---------------------------------------------------------------- my ArtemisBorkderPlugin code ``` import cn.hutool.core.util.ObjectUtil; import com.yeker.iot.broker.plugin.impl.model.Account; import com.yeker.iot.broker.plugin.impl.model.DeviceAuth; import com.yeker.sdk.comm.util.IPUntil; import org.apache.activemq.artemis.api.core.*; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapperResultSetExtractor; import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable { private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class); /** * 遗言topic前缀 */ private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt"; /** * 服务端clientId前缀 */ private static final String SERVICE_CLIENT_ID_PREFIX = "service-"; /** * client id 分隔符 */ private static final String CLIENT_SPLITOR = "@"; /** * 账号数据库信息 */ private String accountDriver; private String accountUrl; private String accountUsername; private String accountPassword; private JdbcTemplate accountJdbcTemplate; /** * 授权认证 */ private boolean accountAuthEnabled = false; private String accountAuthQuerySQL; /** * 设备数据库信息 */ private String deviceDriver; private String deviceUrl; private String deviceUsername; private String devicePassword; private JdbcTemplate deviceJdbcTemplate; /** * 设备授权 */ private boolean deviceAuthEnabled = false; private String deviceAuthCheckSQL; private String deviceAuthLockerSQL; private boolean deviceStatusSyncabled = false; private String connectUpdateSQL; private String disconnectUpdateSQL; private String lwtUpdateSQL; private String resetUpdateSQL; private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>(); private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>(); private boolean debug = false; @Override public void init(Map<String, String> properties) { this.accountDriver = properties.get("accountDriver"); this.accountUrl = properties.get("accountUrl"); this.accountUsername = properties.get("accountUsername"); this.accountPassword = properties.get("accountPassword"); this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled")); if(this.accountAuthEnabled) { accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword); } this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL"); this.deviceDriver = properties.get("deviceDriver"); this.deviceUrl = properties.get("deviceUrl"); this.deviceUsername = properties.get("deviceUsername"); this.devicePassword = properties.get("devicePassword"); this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled")); if(this.deviceAuthEnabled) { deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword); } this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL"); this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL"); this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled")); this.connectUpdateSQL = properties.get("connectUpdateSQL"); this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL"); this.lwtUpdateSQL = properties.get("lwtUpdateSQL"); this.resetUpdateSQL = properties.get("resetUpdateSQL"); log.info("init :[{}] ",properties); log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled); log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled); log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled); log.info("{} 插件初始化",this.getClass().getSimpleName()); } @Override public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { if(debug){ log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID()); } } /** * * @param name * @param username * @param minLargeMessageSize * @param connection * @param autoCommitSends * @param autoCommitAcks * @param preAcknowledge * @param xa * @param defaultAddress * @param callback * @param autoCreateQueues * @param context * @param prefixes * @throws ActiveMQException */ @Override public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, Map<SimpleString, RoutingType> prefixes) throws ActiveMQException { if(debug) { log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username); } String connId = connection.getID().toString(); if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // Debug.print(log); authConnectTables.put(connId,0); doConnectValidation(connection); } } /** * After a session has been created. * * @param session The newly created session * @throws ActiveMQException */ @Override public void afterCreateSession(ServerSession session) throws ActiveMQException { RemotingConnection connection = session.getRemotingConnection(); if(debug) { log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } if(accountAuthEnabled) \{ doAccountValidation(session.getUsername(),session.getPassword()); } } /** * A connection has been destroyed. * * @param connection * @throws ActiveMQException */ @Override public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { if(debug) { log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } doDisconnect(connection); String connId = connection.getID().toString(); authConnectTables.remove(connId); } /** * After a message is sent * * @param session the session that sends the message * @param tx * @param message * @param direct * @param noAutoCreateQueue * @param result * @throws ActiveMQException */ @Override public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException { RemotingConnection connection = session.getRemotingConnection(); if(debug) { log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } doSendLwt(connection,message); String connId = connection.getID().toString(); authConnectTables.remove(connId); } private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{ Boolean success = false; String error = ""; boolean invalid = false; int locked = 0; int eft = 0; String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); try { /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId); if(invalidClientId) {//licId@mac /** * 无效的ClientId */ invalid = true; throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId)); } /** * 连接用户校验 * 有效的ClientId格式 */ if(!isService(clientId)) \{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String devId = clientIdElements[1]; /** * 开启设备授权 */ if(deviceAuthEnabled) \{ /** * 新设备 */ if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth = validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** * 设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId} ); if(locked==0) { throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); } auth.setDevId(devId); authDeviceTables.put(clientId,auth); } } } } /** * 更新设备状态【连接】 */ if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[] {connId,clientIp, brokerIp,licId} ); } } success = true; } catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; } finally{ String prefix = "连接异常"; if(success){ prefix = "连接成功"; log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft); } else{ if(invalid) { log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error); } else{ log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error); } } } } private void doDisconnect(RemotingConnection connection) { /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); int eft = 0; if(invalidClientId(clientId)) { return; } try { if (clientId.indexOf(CLIENT_SPLITOR) != 1) { if (deviceStatusSyncabled) { String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String sql = disconnectUpdateSQL; eft = deviceJdbcTemplate.update(sql, new Object[] {licId,connId} ); } } } finally { log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft); } } private void doSendLwt(RemotingConnection connection,Message message) { int eft = 0; Boolean success = false; String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); String address = message.getAddress(); if(invalidClientId(clientId))\\{ return; } if (MISSING_TOPIC_PREFIX.equals(address)) { if (deviceStatusSyncabled && !isService(clientId)) { try { String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = deviceJdbcTemplate.update(sql, new Object[] {licId, connId} ); success = true; } finally{ if(success){ log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft); } else{ log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message); } } } } } private JdbcTemplate build(String driver, String url, String username, String password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); builder.driverClassName(driver); builder.url(url); builder.username(username); builder.password(password); return new JdbcTemplate(builder.build()); } private String formatClientId(String clientId){ if(ObjectUtil.isNotEmpty(clientId)){ int index = clientId.toLowerCase().indexOf("service-"); if(index!=-1) { int length = clientId.length(); if(length>(index+44)) { return clientId.substring(index,index+44); } else\{ return clientId.substring(index); } } return clientId; } return clientId; } private String formatClientIP(String clientIp) { int index = -1; if(ObjectUtil.isEmpty(clientIp)) { return clientIp; } clientIp = clientIp.replace("/",""); return clientIp; } private void doAccountValidation(String username,String password){ String sql= accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new Object[] {username,password} ,new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class))); if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); } } /** * @param licId yekerId * @param devId cpusn or mac_address */ private DeviceAuth validate(String licId,String devId) throws ActiveMQException \{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[] {licId} , new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class))); if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId)); } DeviceAuth auth = deviceAuths.get(0); if (ObjectUtil.isNotEmpty(auth.getDevId()) && !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); } return auth; } /** * [service node或者broker node] * @param clientId * @return */ private boolean isService(String clientId) \{ return ObjectUtil.isNotEmpty(clientId) && (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);// broker节点 } /** * clientId是否含有@分割符号 * @param clientId * @return */ private static boolean invalidClientId(String clientId) \{ return ObjectUtil.isEmpty(clientId) || (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client } /** * 最后遗嘱LWT(Last Will & Testament) * @param originalTopic * @return */ private boolean isLwt(String originalTopic) \{ return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX); } /** * * @param server */ @Override public void registered(ActiveMQServer server){ log.info("{} 插件注册",this.getClass().getSimpleName()); reset(); } /** * 插件卸载 * @param server */ @Override public void unregistered(ActiveMQServer server){ log.info("{} 插件注销 {}",this.getClass().getSimpleName()); reset(); } /** * 同步因为服务器维护导致相关设备状态不一致 */ private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; deviceJdbcTemplate.update(sql, new Object[] {brokerIp} ); } } ``` !image-2022-08-02-08-23-52-965.png! !image-2022-08-02-08-24-39-288.png! public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { ... ... } !image-2022-08-02-08-45-11-459.png! when i use wireshark capture log !image-2022-08-02-08-31-01-074.png! was (Author: JIRAUSER293605): my ArtemisBorkderPlugin code ``` import cn.hutool.core.util.ObjectUtil; import com.yeker.iot.broker.plugin.impl.model.Account; import com.yeker.iot.broker.plugin.impl.model.DeviceAuth; import com.yeker.sdk.comm.util.IPUntil; import org.apache.activemq.artemis.api.core.*; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapperResultSetExtractor; import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable { private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class); /** * 遗言topic前缀 */ private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt"; /** * 服务端clientId前缀 */ private static final String SERVICE_CLIENT_ID_PREFIX = "service-"; /** * client id 分隔符 */ private static final String CLIENT_SPLITOR = "@"; /** * 账号数据库信息 */ private String accountDriver; private String accountUrl; private String accountUsername; private String accountPassword; private JdbcTemplate accountJdbcTemplate; /** * 授权认证 */ private boolean accountAuthEnabled = false; private String accountAuthQuerySQL; /** * 设备数据库信息 */ private String deviceDriver; private String deviceUrl; private String deviceUsername; private String devicePassword; private JdbcTemplate deviceJdbcTemplate; /** * 设备授权 */ private boolean deviceAuthEnabled = false; private String deviceAuthCheckSQL; private String deviceAuthLockerSQL; private boolean deviceStatusSyncabled = false; private String connectUpdateSQL; private String disconnectUpdateSQL; private String lwtUpdateSQL; private String resetUpdateSQL; private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>(); private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>(); private boolean debug = false; @Override public void init(Map<String, String> properties) { this.accountDriver = properties.get("accountDriver"); this.accountUrl = properties.get("accountUrl"); this.accountUsername = properties.get("accountUsername"); this.accountPassword = properties.get("accountPassword"); this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled")); if(this.accountAuthEnabled) { accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword); } this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL"); this.deviceDriver = properties.get("deviceDriver"); this.deviceUrl = properties.get("deviceUrl"); this.deviceUsername = properties.get("deviceUsername"); this.devicePassword = properties.get("devicePassword"); this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled")); if(this.deviceAuthEnabled) { deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword); } this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL"); this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL"); this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled")); this.connectUpdateSQL = properties.get("connectUpdateSQL"); this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL"); this.lwtUpdateSQL = properties.get("lwtUpdateSQL"); this.resetUpdateSQL = properties.get("resetUpdateSQL"); log.info("init :[{}] ",properties); log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled); log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled); log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled); log.info("{} 插件初始化",this.getClass().getSimpleName()); } @Override public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { if(debug){ log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID()); } } /** * * @param name * @param username * @param minLargeMessageSize * @param connection * @param autoCommitSends * @param autoCommitAcks * @param preAcknowledge * @param xa * @param defaultAddress * @param callback * @param autoCreateQueues * @param context * @param prefixes * @throws ActiveMQException */ @Override public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, Map<SimpleString, RoutingType> prefixes) throws ActiveMQException { if(debug) { log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username); } String connId = connection.getID().toString(); if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // Debug.print(log); authConnectTables.put(connId,0); doConnectValidation(connection); } } /** * After a session has been created. * * @param session The newly created session * @throws ActiveMQException */ @Override public void afterCreateSession(ServerSession session) throws ActiveMQException { RemotingConnection connection = session.getRemotingConnection(); if(debug) { log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } if(accountAuthEnabled) \{ doAccountValidation(session.getUsername(),session.getPassword()); } } /** * A connection has been destroyed. * * @param connection * @throws ActiveMQException */ @Override public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { if(debug) { log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } doDisconnect(connection); String connId = connection.getID().toString(); authConnectTables.remove(connId); } /** * After a message is sent * * @param session the session that sends the message * @param tx * @param message * @param direct * @param noAutoCreateQueue * @param result * @throws ActiveMQException */ @Override public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, RoutingStatus result) throws ActiveMQException { RemotingConnection connection = session.getRemotingConnection(); if(debug) { log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID()); } doSendLwt(connection,message); String connId = connection.getID().toString(); authConnectTables.remove(connId); } private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{ Boolean success = false; String error = ""; boolean invalid = false; int locked = 0; int eft = 0; String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); try { /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId); if(invalidClientId) {//licId@mac /** * 无效的ClientId */ invalid = true; throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId)); } /** * 连接用户校验 * 有效的ClientId格式 */ if(!isService(clientId)){ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String devId = clientIdElements[1]; /** * 开启设备授权 */ if(deviceAuthEnabled) \{ /** * 新设备 */ if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth = validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** * 设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId} ); if(locked==0) { throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); } auth.setDevId(devId); authDeviceTables.put(clientId,auth); } } } } /** * 更新设备状态【连接】 */ if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[] {connId,clientIp, brokerIp,licId} ); } } success = true; } catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; } finally{ String prefix = "连接异常"; if(success){ prefix = "连接成功"; log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft); } else{ if(invalid) { log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error); } else{ log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error); } } } } private void doDisconnect(RemotingConnection connection) { /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); int eft = 0; if(invalidClientId(clientId)) { return; } try { if (clientId.indexOf(CLIENT_SPLITOR) != 1) { if (deviceStatusSyncabled) { String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String sql = disconnectUpdateSQL; eft = deviceJdbcTemplate.update(sql, new Object[] {licId,connId} ); } } } finally { log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft); } } private void doSendLwt(RemotingConnection connection,Message message) { int eft = 0; Boolean success = false; String clientId = connection.getClientID(); String connId = connection.getID().toString(); String clientIp = connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); String address = message.getAddress(); if(invalidClientId(clientId))\{ return; } if (MISSING_TOPIC_PREFIX.equals(address)) { if (deviceStatusSyncabled && !isService(clientId)) { try { String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = deviceJdbcTemplate.update(sql, new Object[] {licId, connId} ); success = true; } finally{ if(success){ log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft); } else{ log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message); } } } } } private JdbcTemplate build(String driver, String url, String username, String password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); builder.driverClassName(driver); builder.url(url); builder.username(username); builder.password(password); return new JdbcTemplate(builder.build()); } private String formatClientId(String clientId){ if(ObjectUtil.isNotEmpty(clientId)){ int index = clientId.toLowerCase().indexOf("service-"); if(index!=-1) { int length = clientId.length(); if(length>(index+44)) { return clientId.substring(index,index+44); } else\{ return clientId.substring(index); } } return clientId; } return clientId; } private String formatClientIP(String clientIp) { int index = -1; if(ObjectUtil.isEmpty(clientIp)) { return clientIp; } clientIp = clientIp.replace("/",""); return clientIp; } private void doAccountValidation(String username,String password){ String sql= accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new Object[] {username,password} ,new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class))); if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); } } /** * @param licId yekerId * @param devId cpusn or mac_address */ private DeviceAuth validate(String licId,String devId) throws ActiveMQException \{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[] {licId} , new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class))); if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId)); } DeviceAuth auth = deviceAuths.get(0); if (ObjectUtil.isNotEmpty(auth.getDevId()) && !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); } return auth; } /** * [service node或者broker node] * @param clientId * @return */ private boolean isService(String clientId) \{ return ObjectUtil.isNotEmpty(clientId) && (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);// broker节点 } /** * clientId是否含有@分割符号 * @param clientId * @return */ private static boolean invalidClientId(String clientId) \{ return ObjectUtil.isEmpty(clientId) || (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client } /** * 最后遗嘱LWT(Last Will & Testament) * @param originalTopic * @return */ private boolean isLwt(String originalTopic) \{ return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX); } /** * * @param server */ @Override public void registered(ActiveMQServer server){ log.info("{} 插件注册",this.getClass().getSimpleName()); reset(); } /** * 插件卸载 * @param server */ @Override public void unregistered(ActiveMQServer server){ log.info("{} 插件注销 {}",this.getClass().getSimpleName()); reset(); } /** * 同步因为服务器维护导致相关设备状态不一致 */ private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; deviceJdbcTemplate.update(sql, new Object[] {brokerIp} ); } } ``` !image-2022-08-02-08-23-52-965.png! !image-2022-08-02-08-24-39-288.png! public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { ... !image-2022-08-02-08-43-39-442.png! ... } when i use wireshark capture log !image-2022-08-02-08-31-01-074.png! > MQTTReasonCodes byte loss of precision,must int type > ---------------------------------------------------- > > Key: ARTEMIS-3913 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3913 > Project: ActiveMQ Artemis > Issue Type: Bug > Reporter: gongping.zhu > Priority: Major > Attachments: image-2022-08-02-08-23-52-965.png, > image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png, > image-2022-08-02-08-42-24-117.png, image-2022-08-02-08-43-39-442.png, > image-2022-08-02-08-45-11-459.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)