[ 
https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974
 ] 

gongping.zhu edited comment on ARTEMIS-3913 at 8/2/22 12:45 AM:
----------------------------------------------------------------

my ArtemisBorkderPlugin code 

 

```

import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import 
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {

private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
 * 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
 * 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
 * client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";

/**
 * 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;

/**
 * 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;

/**
 * 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
 * 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;

private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;

private String resetUpdateSQL;

private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();

private boolean debug = false;

@Override
public void init(Map<String, String> properties) {

this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");

this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)

{ accountJdbcTemplate = 
build(accountDriver,accountUrl,accountUsername,accountPassword); }

this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");

this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");

this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)

{ deviceJdbcTemplate = 
build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }

this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");

this.deviceStatusSyncabled = 
Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");

this.resetUpdateSQL = properties.get("resetUpdateSQL");

log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}

@Override
public void afterCreateConnection(RemotingConnection connection) throws 
ActiveMQException {
if(debug){
log.info("afterCreateConnection 
{},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}

/**
*
 * @param name
 * @param username
 * @param minLargeMessageSize
 * @param connection
 * @param autoCommitSends
 * @param autoCommitAcks
 * @param preAcknowledge
 * @param xa
 * @param defaultAddress
 * @param callback
 * @param autoCreateQueues
 * @param context
 * @param prefixes
 * @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int 
minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, 
boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean 
autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // 
Debug.print(log); authConnectTables.put(connId,0); 
doConnectValidation(connection); }
}

/**
 * After a session has been created.
*
 * @param session The newly created session
 * @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{ 
doAccountValidation(session.getUsername(),session.getPassword()); }
}

/**
 * A connection has been destroyed.
*
 * @param connection
 * @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws 
ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}

doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

/**
 * After a message is sent
*
 * @param session the session that sends the message
 * @param tx
 * @param message
 * @param direct
 * @param noAutoCreateQueue
 * @param result
 * @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}

doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

private void doConnectValidation(RemotingConnection connection) throws 
ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try

{ /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId); 
if(invalidClientId)

{//licId@mac /** * 无效的ClientId */ invalid = true; throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
 }

/**
 * 连接用户校验
 * 有效的ClientId格式
*/
if(!isService(clientId)) \{ String[] clientIdElements = 
clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String 
devId = clientIdElements[1]; /** * 开启设备授权 */ if(deviceAuthEnabled) \{ /** * 新设备 
*/ if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth 
= validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** * 
设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = 
deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = 
deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
 }
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}

/**
 * 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = 
IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[] 
{connId,clientIp, brokerIp,licId}
);
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId, locked,eft,error);
}
}
}
}

private void doDisconnect(RemotingConnection connection)

{ /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId 
= connection.getID().toString(); String clientIp = 
connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); 
int eft = 0; if(invalidClientId(clientId))

{ return; }

try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1)

{ if (deviceStatusSyncabled) 
{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = 
clientIdElements[0]; String sql = disconnectUpdateSQL; eft = 
deviceJdbcTemplate.update(sql, new Object[]

{licId,connId}

);
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}

private void doSendLwt(RemotingConnection connection,Message message)

{ int eft = 0; Boolean success = false; String clientId = 
connection.getClientID(); String connId = connection.getID().toString(); String 
clientIp = connection.getRemoteAddress().toString(); clientIp = 
formatClientIP(clientIp); String address = message.getAddress(); 
if(invalidClientId(clientId))\\{ return; }

if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId))

{ try

{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = 
clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = 
deviceJdbcTemplate.update(sql, new Object[]

{licId, connId}

);
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" 
,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) 
,clientIp,connId,"LWT",eft,message);
}
}
}
}
}

private JdbcTemplate build(String driver, String url, String username, String 
password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); 
builder.driverClassName(driver); builder.url(url); builder.username(username); 
builder.password(password); return new JdbcTemplate(builder.build()); }

private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1)

{ int length = clientId.length(); if(length>(index+44))

{ return clientId.substring(index,index+44); }

else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}

private String formatClientIP(String clientIp)

{ int index = -1; if(ObjectUtil.isEmpty(clientIp))

{ return clientIp; }

clientIp = clientIp.replace("/","");
return clientIp;
}

private void doAccountValidation(String username,String password){ String sql= 
accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new 
Object[]

{username,password}

,new RowMapperResultSetExtractor<Account>(new 
BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}

/**
 * @param licId yekerId
 * @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException 
\{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; 
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[] 
{licId}
, new RowMapperResultSetExtractor<DeviceAuth>(new 
BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
 }
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
 }
return auth;
}
/**

 * [service node或者broker node]
 * @param clientId
 * @return
*/
private boolean isService(String clientId) \{ return 
ObjectUtil.isNotEmpty(clientId) && 
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);// 
broker节点 }

/**
 * clientId是否含有@分割符号
 * @param clientId
 * @return
*/
private static boolean invalidClientId(String clientId) \{ return 
ObjectUtil.isEmpty(clientId) || 
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && 
clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && 
clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**

 * 最后遗嘱LWT(Last Will & Testament)
 * @param originalTopic
 * @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null && 
originalTopic.startsWith(MISSING_TOPIC_PREFIX); }

/**
*
 * @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
 * 插件卸载
 * @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}

/**
 * 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); 
String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; 
deviceJdbcTemplate.update(sql, new Object[] {brokerIp}
);
}
}

```

 

!image-2022-08-02-08-23-52-965.png!

!image-2022-08-02-08-24-39-288.png!

 

public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter

{ 

...

...

}

!image-2022-08-02-08-45-11-459.png!

when i use wireshark capture log 

!image-2022-08-02-08-31-01-074.png!

 

 

 


was (Author: JIRAUSER293605):
my ArtemisBorkderPlugin code 

 

```

import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import 
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {

private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
 * 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
 * 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
 * client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";

/**
 * 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;

/**
 * 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;

/**
 * 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
 * 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;

private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;

private String resetUpdateSQL;

private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();

private boolean debug = false;

@Override
public void init(Map<String, String> properties) {

this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");

this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)

{ accountJdbcTemplate = 
build(accountDriver,accountUrl,accountUsername,accountPassword); }

this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");

this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");

this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)

{ deviceJdbcTemplate = 
build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }

this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");

this.deviceStatusSyncabled = 
Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");

this.resetUpdateSQL = properties.get("resetUpdateSQL");

log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}

@Override
public void afterCreateConnection(RemotingConnection connection) throws 
ActiveMQException {
if(debug){
log.info("afterCreateConnection 
{},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}

/**
*
 * @param name
 * @param username
 * @param minLargeMessageSize
 * @param connection
 * @param autoCommitSends
 * @param autoCommitAcks
 * @param preAcknowledge
 * @param xa
 * @param defaultAddress
 * @param callback
 * @param autoCreateQueues
 * @param context
 * @param prefixes
 * @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int 
minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, 
boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean 
autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // 
Debug.print(log); authConnectTables.put(connId,0); 
doConnectValidation(connection); }
}

/**
 * After a session has been created.
*
 * @param session The newly created session
 * @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{ 
doAccountValidation(session.getUsername(),session.getPassword()); }
}

/**
 * A connection has been destroyed.
*
 * @param connection
 * @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws 
ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}

doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

/**
 * After a message is sent
*
 * @param session the session that sends the message
 * @param tx
 * @param message
 * @param direct
 * @param noAutoCreateQueue
 * @param result
 * @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), 
connection.getRemoteAddress(), connection.getID());
}

doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

private void doConnectValidation(RemotingConnection connection) throws 
ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try

{ /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId); 
if(invalidClientId)

{//licId@mac /** * 无效的ClientId */ invalid = true; throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
 }

/**
 * 连接用户校验
 * 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];

/**
 * 开启设备授权
*/
if(deviceAuthEnabled) \{ /** * 新设备 */ 
if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth = 
validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** * 
设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = 
deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = 
deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
 }
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}

/**
 * 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = 
IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[] 
{connId,clientIp, brokerIp,licId}
);
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), 
clientIp,connId, locked,eft,error);
}
}
}
}

private void doDisconnect(RemotingConnection connection)

{ /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId 
= connection.getID().toString(); String clientIp = 
connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp); 
int eft = 0; if(invalidClientId(clientId))

{ return; }

try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1)

{ if (deviceStatusSyncabled) 
{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = 
clientIdElements[0]; String sql = disconnectUpdateSQL; eft = 
deviceJdbcTemplate.update(sql, new Object[]
{licId,connId}

);
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}

private void doSendLwt(RemotingConnection connection,Message message)

{ int eft = 0; Boolean success = false; String clientId = 
connection.getClientID(); String connId = connection.getID().toString(); String 
clientIp = connection.getRemoteAddress().toString(); clientIp = 
formatClientIP(clientIp); String address = message.getAddress(); 
if(invalidClientId(clientId))\{ return; }

if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId))

{ try

{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = 
clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = 
deviceJdbcTemplate.update(sql, new Object[]

{licId, connId}

);
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" 
,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) 
,clientIp,connId,"LWT",eft,message);
}
}
}
}
}

private JdbcTemplate build(String driver, String url, String username, String 
password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); 
builder.driverClassName(driver); builder.url(url); builder.username(username); 
builder.password(password); return new JdbcTemplate(builder.build()); }

private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1)

{ int length = clientId.length(); if(length>(index+44))

{ return clientId.substring(index,index+44); }

else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}

private String formatClientIP(String clientIp)

{ int index = -1; if(ObjectUtil.isEmpty(clientIp))

{ return clientIp; }

clientIp = clientIp.replace("/","");
return clientIp;
}

private void doAccountValidation(String username,String password){ String sql= 
accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new 
Object[]

{username,password}

,new RowMapperResultSetExtractor<Account>(new 
BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}

/**
 * @param licId yekerId
 * @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException 
\{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; 
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[] 
{licId}
, new RowMapperResultSetExtractor<DeviceAuth>(new 
BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
 }
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new 
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
 }
return auth;
}
/**

 * [service node或者broker node]
 * @param clientId
 * @return
*/
private boolean isService(String clientId) \{ return 
ObjectUtil.isNotEmpty(clientId) && 
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);// 
broker节点 }

/**
 * clientId是否含有@分割符号
 * @param clientId
 * @return
*/
private static boolean invalidClientId(String clientId) \{ return 
ObjectUtil.isEmpty(clientId) || 
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && 
clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && 
clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**

 * 最后遗嘱LWT(Last Will & Testament)
 * @param originalTopic
 * @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null && 
originalTopic.startsWith(MISSING_TOPIC_PREFIX); }

/**
*
 * @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
 * 插件卸载
 * @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}

/**
 * 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); 
String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; 
deviceJdbcTemplate.update(sql, new Object[] {brokerIp}
);
}
}

```

 

!image-2022-08-02-08-23-52-965.png!

!image-2022-08-02-08-24-39-288.png!

 

public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter

{ 

...

!image-2022-08-02-08-43-39-442.png!

...

}

when i use wireshark capture log 

!image-2022-08-02-08-31-01-074.png!

 

 

 

> MQTTReasonCodes byte loss of precision,must int type
> ----------------------------------------------------
>
>                 Key: ARTEMIS-3913
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3913
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: gongping.zhu
>            Priority: Major
>         Attachments: image-2022-08-02-08-23-52-965.png, 
> image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png, 
> image-2022-08-02-08-42-24-117.png, image-2022-08-02-08-43-39-442.png, 
> image-2022-08-02-08-45-11-459.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to