GitHub user onlyMIT opened a pull request:
https://github.com/apache/activemq-artemis/pull/2466
NO-JIRA The MQTT consumer reconnection caused the queue to not be cleâ¦
### Test environment
1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on
one server to test Artemis, set the 'cleanSession' property to trueï¼
2. MQTT client: paho 1.2.0;
3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T
### Issue
**Issue 1**
Artemis broker has the following exception log:
`[Thread-0
(ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)]
17:27:59,035 WARN [org.apache.activemq.artemis.utils.actors.OrderedExecutor]
null: java.lang.NullPointerException
at
org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182)
[:]
at
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150)
[:]
at
org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37)
[:]
at
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147)
[:]
at
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561)
[:]
at
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542)
[:]
at
org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858)
[:]
at
org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83)
[:]
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
[:]
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
[:]
at
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66)
[:]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[rt.jar:1.8.0_101]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[rt.jar:1.8.0_101]
at
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
[:]`
**Issue 2**
After closing all client connections, 64 queues were not cleaned upã
### Analysis and simulation reproduction
When the MQTT consumer client (cleanSession property set to true)
reconnected,There is a certain probability that the queue will not be
automatically cleared and a NullPointerException will be thrown.
This is because the MQTT consumer client thinks that its connection has
been disconnected and triggers reconnection, but the MQTT connection is still
alive at Artemis broker. This bug occurs when the Artemis broker to start
processing a ânew MQTT connectionâ while closing the âold MQTT
connectionâ.
Create an MQTT consumer (cleanSession: true, clientID: superConsumer,
topic: mit.test) and connect to the Artemis broker. Create another MQTT
consumer to set the same cleanSession, clientID, and topic, then start
connecting with the Artemis broker. Close the two MQTT connections, and so many
times after repeated trials, there is a probability to reproduce the two
problems mentioned above.
### Solution
**Issue 1**
When 'session.getProtocolManager().isClientConnected(clientId,
session.getConnection())' is called, if the 'MQTTConnection' instance retrieved
from 'connectedClients' is 'null', a NullPointerException is thrown. Add a
non-null decision in the 'MQTTProtocolManager.isClientConnected' method.
**Issue 2**
1. Remove âInterruptedExceptionâ from the
âMQTTConnectionManager.getSessionStateâ method because the
âInterruptedExceptionâ exception will never be thrown in this method;
2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect'
methods add 'synchronized' with the MQTTSessionState instance as a lock.In the
Artemis broker, all MQTT connections using the same clientId share the same
MQTTSessionState instance. After adding this lock, you can avoid calling the
'connect' and 'disconnect' methods on the MQTT connections with the same
clientId.
3. For the MQTT protocol, there is one and only one consumer connection per
queue, which is a good choice for closing the old MQTT consumer before the new
MQTT consumer connects.
The original code could not effectively clean up the 'old consumer' in
the queue when the 'new MQTT connection' was connected to the Artemis broker.
Modify âMQTTSubscriptionManager.removeSubscriptionâ to get the queue
consumer collection from the âQueueImplâ instance and close them.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/onlyMIT/activemq-artemis onlyMIT-artemis-2.7.0
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/activemq-artemis/pull/2466.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2466
----
commit 8140f2e9d094ea32c272ed763c3827a73e12e420
Author: onlyMIT <liangshipin_g@...>
Date: 2018-12-17T07:53:33Z
NO-JIRA The MQTT consumer reconnection caused the queue to not be cleared,
and caused Artemis broker to throw a NullPointerException.
When the MQTT consumer client (cleanSession property set to true)
reconnected, there are certain probabilities that these two bugs will occur.
This is because the MQTT consumer client thinks that its connection has
been disconnected and triggers reconnection, but the MQTT connection is still
alive at Artemis broker. This bug occurs when new and old connections occur
while operating the same queue for unsafe behavior.
----
---