This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ee7e72  ARTEMIS-3445 automatically clean-up abandoned MQTT 
subscriptions
     new a4be853  This closes #3714
6ee7e72 is described below

commit 6ee7e72db1e025c967ef3c21a5faf41a493cad5c
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Aug 19 10:32:27 2021 -0500

    ARTEMIS-3445 automatically clean-up abandoned MQTT subscriptions
---
 .../core/protocol/mqtt/MQTTProtocolManager.java    | 40 +++++++++++++---
 .../protocol/mqtt/MQTTProtocolManagerFactory.java  | 39 ++++++++++++++--
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  2 +
 .../core/protocol/mqtt/MQTTSessionState.java       | 11 +++++
 .../artemis/core/protocol/ProtocolHandler.java     |  4 ++
 .../core/remoting/impl/netty/NettyAcceptor.java    |  2 +-
 .../core/remoting/server/RemotingService.java      |  2 +
 .../remoting/server/impl/RemotingServiceImpl.java  |  3 +-
 .../artemis/spi/core/remoting/Acceptor.java        |  5 ++
 docs/user-manual/en/mqtt.md                        | 16 +++++++
 .../imported/MQTTSessionExpiryIntervalTest.java    | 54 ++++++++++++++++++++++
 .../tests/integration/mqtt/imported/MQTTTest.java  | 17 +++++++
 12 files changed, 180 insertions(+), 15 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 6513247..4483728 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -58,25 +59,31 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
    private final List<MQTTInterceptor> incomingInterceptors = new 
ArrayList<>();
    private final List<MQTTInterceptor> outgoingInterceptors = new 
ArrayList<>();
 
-   //TODO Read in a list of existing client IDs from stored Sessions.
-   private final Map<String, MQTTConnection> connectedClients;
-   private final Map<String, MQTTSessionState> sessionStates;
+   private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
+   private final Map<String, MQTTSessionState> sessionStates = new 
ConcurrentHashMap<>();
+
+   private int defaultMqttSessionExpiryInterval = -1;
 
    private final MQTTRedirectHandler redirectHandler;
 
    MQTTProtocolManager(ActiveMQServer server,
-                       Map<String, MQTTConnection> connectedClients,
-                       Map<String, MQTTSessionState> sessionStates,
                        List<BaseInterceptor> incomingInterceptors,
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
-      this.connectedClients = connectedClients;
-      this.sessionStates = sessionStates;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
       server.getManagementService().addNotificationListener(this);
       redirectHandler = new MQTTRedirectHandler(server);
    }
 
+   public int getDefaultMqttSessionExpiryInterval() {
+      return defaultMqttSessionExpiryInterval;
+   }
+
+   public MQTTProtocolManager setDefaultMqttSessionExpiryInterval(int 
sessionExpiryInterval) {
+      this.defaultMqttSessionExpiryInterval = sessionExpiryInterval;
+      return this;
+   }
+
    @Override
    public void onNotification(Notification notification) {
       if (!(notification.getType() instanceof CoreNotificationType))
@@ -125,6 +132,25 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
       
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
    }
 
+   public void scanSessions() {
+      if (defaultMqttSessionExpiryInterval == -1) {
+         log.debug("sessionExpiryInterval is -1 so skipping check");
+      } else {
+         for (Map.Entry<String, MQTTSessionState> entry : 
sessionStates.entrySet()) {
+            MQTTSessionState state = entry.getValue();
+            if (log.isDebugEnabled()) {
+               log.debug("Inspecting session state: " + state);
+            }
+            if (!state.getAttached() && state.getDisconnectedTime() + 
(defaultMqttSessionExpiryInterval * 1000) < System.currentTimeMillis()) {
+               if (log.isDebugEnabled()) {
+                  log.debug("Removing expired session state: " + state);
+               }
+               sessionStates.remove(entry.getKey());
+            }
+         }
+      }
+   }
+
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection connection) {
       try {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index d165304..bea138e 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -19,9 +19,13 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import 
org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -38,16 +42,14 @@ public class MQTTProtocolManagerFactory extends 
AbstractProtocolManagerFactory<M
 
    private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
 
-   private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
-   private final Map<String, MQTTSessionState> sessionStates = new 
ConcurrentHashMap<>();
-
    @Override
    public ProtocolManager createProtocolManager(ActiveMQServer server,
                                                 final Map<String, Object> 
parameters,
                                                 List<BaseInterceptor> 
incomingInterceptors,
                                                 List<BaseInterceptor> 
outgoingInterceptors) throws Exception {
+
       BeanSupport.stripPasswords(parameters);
-      return BeanSupport.setData(new MQTTProtocolManager(server, 
connectedClients, sessionStates, incomingInterceptors, outgoingInterceptors), 
parameters);
+      return BeanSupport.setData(new MQTTProtocolManager(server, 
incomingInterceptors, outgoingInterceptors), parameters);
    }
 
    @Override
@@ -64,4 +66,31 @@ public class MQTTProtocolManagerFactory extends 
AbstractProtocolManagerFactory<M
    public String getModuleName() {
       return MODULE_NAME;
    }
+
+
+   @Override
+   public void loadProtocolServices(ActiveMQServer server, 
List<ActiveMQComponent> services) {
+      services.add(new MQTTPeriodicTasks(server, server.getScheduledPool()));
+   }
+
+   public class MQTTPeriodicTasks extends ActiveMQScheduledComponent {
+      final ActiveMQServer server;
+      public MQTTPeriodicTasks(ActiveMQServer server, ScheduledExecutorService 
scheduledExecutorService) {
+         super(scheduledExecutorService, null, 5, TimeUnit.SECONDS, false);
+         this.server = server;
+      }
+      @Override
+      public void run() {
+         server.getRemotingService().getAcceptors().forEach((key, acceptor) -> 
{
+            ProtocolHandler protocolHandler = acceptor.getProtocolHandler();
+            if (protocolHandler != null) {
+               protocolHandler.getProtocolMap().values().forEach(m -> {
+                  if (m instanceof MQTTProtocolManager) {
+                     ((MQTTProtocolManager)m).scanSessions();
+                  }
+               });
+            }
+         });
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index b06d1f5..1241ab5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -108,6 +108,7 @@ public class MQTTSession {
 
          if (state != null) {
             state.setAttached(false);
+            state.setDisconnectedTime(System.currentTimeMillis());
          }
 
          if (isClean()) {
@@ -178,6 +179,7 @@ public class MQTTSession {
    void setSessionState(MQTTSessionState state) {
       this.state = state;
       state.setAttached(true);
+      this.state.setDisconnectedTime(0);
    }
 
    MQTTRetainMessageManager getRetainMessageManager() {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 204f8d5..f590c4e 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -47,6 +47,8 @@ public class MQTTSessionState {
 
    private boolean attached = false;
 
+   private long disconnectedTime = 0;
+
    private final OutboundStore outboundStore = new OutboundStore();
 
    public MQTTSessionState(String clientId) {
@@ -59,6 +61,7 @@ public class MQTTSessionState {
       addressMessageMap.clear();
       pubRec.clear();
       outboundStore.clear();
+      disconnectedTime = 0;
    }
 
    OutboundStore getOutboundStore() {
@@ -120,6 +123,14 @@ public class MQTTSessionState {
       this.clientId = clientId;
    }
 
+   long getDisconnectedTime() {
+      return disconnectedTime;
+   }
+
+   void setDisconnectedTime(long disconnectedTime) {
+      this.disconnectedTime = disconnectedTime;
+   }
+
    void removeMessageRef(Integer mqttId) {
       MQTTMessageInfo info = messageRefStore.remove(mqttId);
       if (info != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 4286eac..69ab95f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -82,6 +82,10 @@ public class ProtocolHandler {
       }
    }
 
+   public Map<String, ProtocolManager> getProtocolMap() {
+      return protocolMap;
+   }
+
    public ChannelHandler getProtocolDecoder() {
       return new ProtocolDecoder(true, false);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 2005c2d..eca729a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -553,7 +553,7 @@ public class NettyAcceptor extends AbstractAcceptor {
       return connections;
    }
 
-   // Only for testing purposes
+   @Override
    public ProtocolHandler getProtocolHandler() {
       return protocolHandler;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 92f44f0..5014d11 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -118,6 +118,8 @@ public interface RemotingService {
     */
    Acceptor getAcceptor(String name);
 
+   Map<String, Acceptor> getAcceptors();
+
    Acceptor createAcceptor(String name, String uri) throws Exception;
 
    Acceptor createAcceptor(TransportConfiguration transportConfiguration);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index e53fd01..ad2439b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -292,8 +292,7 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
       return acceptor;
    }
 
-
-   /** No interface method, for tests only */
+   @Override
    public Map<String, Acceptor> getAcceptors() {
       return acceptors;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
index 2468c81..f314a04 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -78,4 +79,8 @@ public interface Acceptor extends ActiveMQComponent {
     * stores on acceptors which support SSL.
     */
    void reload();
+
+   default ProtocolHandler getProtocolHandler() {
+      return null;
+   }
 }
diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md
index d9234b9..fcb6993 100644
--- a/docs/user-manual/en/mqtt.md
+++ b/docs/user-manual/en/mqtt.md
@@ -150,3 +150,19 @@ MQTT over Web Sockets is supported via a normal MQTT 
acceptor:
 With this configuration, Apache ActiveMQ Artemis will accept MQTT connections
 over Web Sockets on the port `1883`. Web browsers can then connect to
 `ws://<server>:1883` using a Web Socket to send and receive MQTT messages.
+
+## Automatic Subscription Clean-up
+
+Sometimes MQTT clients don't clean up their subscriptions. In such situations
+the `auto-delete-queues-delay` and `auto-delete-queues-message-count`
+address-settings can be used to clean up the abandoned subscription queues.
+However, the MQTT session meta-data is still present in memory and needs to be
+cleaned up as well. The URL parameter `defaultMqttSessionExpiryInterval` can be
+configured on the MQTT `acceptor` to deal with this situation.
+
+The default `defaultMqttSessionExpiryInterval` is `-1` which means no session
+state will be expired. Otherwise it represents the number of _milliseconds_
+which must elapse after the client has disconnected before the broker will
+remove the session state.
+
+MQTT session state is scanned every 5 seconds.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSessionExpiryIntervalTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSessionExpiryIntervalTest.java
new file mode 100644
index 0000000..319d025
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSessionExpiryIntervalTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTSessionExpiryIntervalTest extends MQTTTestSupport {
+
+   private static final Logger log = 
LoggerFactory.getLogger(MQTTSessionExpiryIntervalTest.class);
+
+   @Test(timeout = 60 * 1000)
+   public void testCustomSessionExpiryInterval() throws Exception {
+      final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+
+      BlockingConnection connectionSub = mqttSub.blockingConnection();
+      connectionSub.connect();
+
+      assertEquals(1, getSessions().size());
+
+      Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+      connectionSub.subscribe(topics);
+      connectionSub.disconnect();
+
+      Wait.assertEquals(0, () -> getSessions().size(), 10000, 100);
+   }
+
+   @Override
+   protected void addMQTTConnector() throws Exception {
+      server.getConfiguration().addAcceptorConfiguration("MQTT", 
"tcp://localhost:" + port + 
"?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;defaultMqttSessionExpiryInterval=3");
+
+      log.debug("Added MQTT connector to broker");
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 02eea33..a3839fe 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1601,6 +1601,23 @@ public class MQTTTest extends MQTTTestSupport {
       assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 
1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
    }
 
+   @Test(timeout = 60 * 1000)
+   public void testDefaultSessionExpiryInterval() throws Exception {
+      final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+
+      BlockingConnection connectionSub = mqttSub.blockingConnection();
+      connectionSub.connect();
+
+      assertEquals(1, getSessions().size());
+
+      Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+      connectionSub.subscribe(topics);
+      connectionSub.disconnect();
+
+      // session shouldn't expire by default
+      Wait.assertEquals(1, () -> getSessions().size(), 10000, 100);
+   }
+
    @Test(timeout = 30 * 1000)
    public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
       stopBroker();

Reply via email to