This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 971f673  ARTEMIS-2206 The MQTT consumer reconnection caused the queue 
to not be cleared, and caused Artemis broker to throw a NullPointerException.
     new c7fa858  This closes #2466
971f673 is described below

commit 971f673c602f859f342e22afe988c71687f754b6
Author: onlyMIT <liangshipi...@qq.com>
AuthorDate: Wed Dec 19 16:26:12 2018 +0800

    ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be 
cleared, and caused Artemis broker to throw a NullPointerException.
    
    When the MQTT consumer client (cleanSession property set to true) 
reconnected, there are certain probabilities that these two bugs will occur.
    This is because the MQTT consumer client thinks that its connection has 
been disconnected and triggers reconnection, but the MQTT connection is still 
alive at Artemis broker. This bug occurs when new and old connections occur 
while operating the same queue for unsafe behavior.
---
 .../core/protocol/mqtt/MQTTConnectionManager.java  | 110 +++++++++++----------
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  48 ++++++---
 .../mqtt/imported/MQTTQueueCleanTest.java          |  69 +++++++++++++
 3 files changed, 158 insertions(+), 69 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 8efea0a..c24a684 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -58,15 +58,15 @@ public class MQTTConnectionManager {
    /**
     * Handles the connect packet.  See spec for details on each of parameters.
     */
-   synchronized void connect(String cId,
-                             String username,
-                             byte[] passwordInBytes,
-                             boolean will,
-                             byte[] willMessage,
-                             String willTopic,
-                             boolean willRetain,
-                             int willQosLevel,
-                             boolean cleanSession) throws Exception {
+   void connect(String cId,
+                String username,
+                byte[] passwordInBytes,
+                boolean will,
+                byte[] willMessage,
+                String willTopic,
+                boolean willRetain,
+                int willQosLevel,
+                boolean cleanSession) throws Exception {
       String clientId = validateClientId(cId, cleanSession);
       if (clientId == null) {
          
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
@@ -74,34 +74,36 @@ public class MQTTConnectionManager {
          return;
       }
 
-      String password = passwordInBytes == null ? null : new 
String(passwordInBytes, CharsetUtil.UTF_8);
-      session.getConnection().setClientID(clientId);
-      ServerSessionImpl serverSession = createServerSession(username, 
password);
-      serverSession.start();
-      session.setServerSession(serverSession);
+      MQTTSessionState sessionState = getSessionState(clientId);
+      synchronized (sessionState) {
+         session.setSessionState(sessionState);
+         String password = passwordInBytes == null ? null : new 
String(passwordInBytes, CharsetUtil.UTF_8);
+         session.getConnection().setClientID(clientId);
+         ServerSessionImpl serverSession = createServerSession(username, 
password);
+         serverSession.start();
+         session.setServerSession(serverSession);
 
-      session.setSessionState(getSessionState(clientId));
+         if (cleanSession) {
+            /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and 
Server MUST discard any previous Session and
+             * start a new one. This Session lasts as long as the Network 
Connection. State data associated with this Session
+             * MUST NOT be reused in any subsequent Session */
+            session.clean();
+            session.setClean(true);
+         }
 
-      if (cleanSession) {
-         /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server 
MUST discard any previous Session and
-          * start a new one. This Session lasts as long as the Network 
Connection. State data associated with this Session
-          * MUST NOT be reused in any subsequent Session */
-         session.clean();
-         session.setClean(true);
-      }
+         if (will) {
+            isWill = true;
+            this.willMessage = 
ByteBufAllocator.DEFAULT.buffer(willMessage.length);
+            this.willMessage.writeBytes(willMessage);
+            this.willQoSLevel = willQosLevel;
+            this.willRetain = willRetain;
+            this.willTopic = willTopic;
+         }
 
-      if (will) {
-         isWill = true;
-         this.willMessage = 
ByteBufAllocator.DEFAULT.buffer(willMessage.length);
-         this.willMessage.writeBytes(willMessage);
-         this.willQoSLevel = willQosLevel;
-         this.willRetain = willRetain;
-         this.willTopic = willTopic;
+         session.getConnection().setConnected(true);
+         session.start();
+         
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
       }
-
-      session.getConnection().setConnected(true);
-      session.start();
-      
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
    }
 
    /**
@@ -133,35 +135,37 @@ public class MQTTConnectionManager {
       return (ServerSessionImpl) serverSession;
    }
 
-   synchronized void disconnect(boolean failure) {
+   void disconnect(boolean failure) {
       if (session == null || session.getStopped()) {
          return;
       }
 
-      try {
-         if (isWill && failure) {
-            session.getMqttPublishManager().sendInternal(0, willTopic, 
willQoSLevel, willMessage, willRetain, true);
-         }
-         session.stop();
-         session.getConnection().destroy();
-      } catch (Exception e) {
-         log.error("Error disconnecting client: " + e.getMessage());
-      } finally {
-         if (session.getSessionState() != null) {
-            session.getSessionState().setAttached(false);
-            String clientId = session.getSessionState().getClientId();
-            /**
-             *  ensure that the connection for the client ID matches *this* 
connection otherwise we could remove the
-             *  entry for the client who "stole" this client ID via 
[MQTT-3.1.4-2]
-             */
-            if (clientId != null && 
session.getProtocolManager().isClientConnected(clientId, 
session.getConnection())) {
-               session.getProtocolManager().removeConnectedClient(clientId);
+      synchronized (session.getSessionState()) {
+         try {
+            if (isWill && failure) {
+               session.getMqttPublishManager().sendInternal(0, willTopic, 
willQoSLevel, willMessage, willRetain, true);
+            }
+            session.stop();
+            session.getConnection().destroy();
+         } catch (Exception e) {
+            log.error("Error disconnecting client: " + e.getMessage());
+         } finally {
+            if (session.getSessionState() != null) {
+               session.getSessionState().setAttached(false);
+               String clientId = session.getSessionState().getClientId();
+               /**
+                *  ensure that the connection for the client ID matches *this* 
connection otherwise we could remove the
+                *  entry for the client who "stole" this client ID via 
[MQTT-3.1.4-2]
+                */
+               if (clientId != null && 
session.getProtocolManager().isClientConnected(clientId, 
session.getConnection())) {
+                  session.getProtocolManager().removeConnectedClient(clientId);
+               }
             }
          }
       }
    }
 
-   private MQTTSessionState getSessionState(String clientId) {
+   private synchronized MQTTSessionState getSessionState(String clientId) {
       return session.getProtocolManager().getSessionState(clientId);
    }
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 4093f5e..501c26b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -182,24 +183,37 @@ public class MQTTSubscriptionManager {
    }
 
    void removeSubscriptions(List<String> topics) throws Exception {
-      for (String topic : topics) {
-         removeSubscription(topic);
+      synchronized (session.getSessionState()) {
+         for (String topic : topics) {
+            removeSubscription(topic);
+         }
       }
    }
 
-   // FIXME: Do we need this synchronzied?
-   private synchronized void removeSubscription(String address) throws 
Exception {
+   private void removeSubscription(String address) throws Exception {
       String internalAddress = 
MQTTUtil.convertMQTTAddressFilterToCore(address, 
session.getWildcardConfiguration());
-
       SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
       session.getSessionState().removeSubscription(address);
 
-
-      ServerConsumer consumer = consumers.get(address);
-      consumers.remove(address);
-      if (consumer != null) {
-         consumer.close(false);
-         consumerQoSLevels.remove(consumer.getID());
+      SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
+      AddressInfo addressInfo = 
session.getServerSession().getAddress(sAddress);
+      if (addressInfo != null && 
addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
+         ServerConsumer consumer = consumers.get(address);
+         consumers.remove(address);
+         if (consumer != null) {
+            consumer.close(false);
+            consumerQoSLevels.remove(consumer.getID());
+         }
+      } else {
+         consumers.remove(address);
+         Queue queue = session.getServer().locateQueue(internalQueueName);
+         Set<Consumer> queueConsumers;
+         if (queue != null && (queueConsumers = (Set<Consumer>) 
queue.getConsumers()) != null) {
+            for (Consumer consumer : queueConsumers) {
+               ((ServerConsumer) consumer).close(false);
+               consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
+            }
+         }
       }
 
       if 
(session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
@@ -219,13 +233,15 @@ public class MQTTSubscriptionManager {
     * @throws Exception
     */
    int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws 
Exception {
-      int[] qos = new int[subscriptions.size()];
+      synchronized (session.getSessionState()) {
+         int[] qos = new int[subscriptions.size()];
 
-      for (int i = 0; i < subscriptions.size(); i++) {
-         addSubscription(subscriptions.get(i));
-         qos[i] = subscriptions.get(i).qualityOfService().value();
+         for (int i = 0; i < subscriptions.size(); i++) {
+            addSubscription(subscriptions.get(i));
+            qos[i] = subscriptions.get(i).qualityOfService().value();
+         }
+         return qos;
       }
-      return qos;
    }
 
    Map<Long, Integer> getConsumerQoSLevels() {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java
new file mode 100644
index 0000000..cb97e16
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class MQTTQueueCleanTest extends MQTTTestSupport {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MQTTQueueCleanTest.class);
+
+   @Test
+   public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws 
Exception {
+      Random random = new Random();
+      Set<MQTTClientProvider> clientProviders = new HashSet<>(11);
+      int repeatCount = 0;
+      String address = "clean/test";
+      String clientId = "sameClientId";
+      String queueName = "::sameClientId.clean.test";
+      //The abnormal scene does not necessarily occur, repeating 100 times to 
ensure the recurrence of the abnormality
+      while (repeatCount < 100) {
+         repeatCount++;
+         int subConnectionCount = random.nextInt(50) + 1;
+         int sC = 0;
+         try {
+            //Reconnect at least twice to reproduce the problem
+            while (sC < subConnectionCount) {
+               sC++;
+               MQTTClientProvider clientProvider = getMQTTClientProvider();
+               clientProvider.setClientId(clientId);
+               initializeConnection(clientProvider);
+               clientProviders.add(clientProvider);
+               clientProvider.subscribe(address, AT_LEAST_ONCE);
+            }
+         } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+         } finally {
+            for (MQTTClientProvider clientProvider : clientProviders) {
+               clientProvider.disconnect();
+            }
+            clientProviders.clear();
+            assertTrue(Wait.waitFor(() -> 
server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10));
+         }
+      }
+   }
+
+}

Reply via email to