This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e68bb1902 ARTEMIS-4501 clean up MQTT subscription queues when session 
expires
8e68bb1902 is described below

commit 8e68bb1902f0044db36f0c4b99acc74495039585
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Mon Nov 13 12:02:46 2023 -0600

    ARTEMIS-4501 clean up MQTT subscription queues when session expires
---
 .../core/protocol/mqtt/MQTTStateManager.java       |  7 ++++--
 docs/user-manual/mqtt.adoc                         | 26 ++++++----------------
 docs/user-manual/versions.adoc                     | 19 ++++++++++++++++
 .../mqtt5/spec/controlpackets/ConnectTests.java    | 13 ++++++++---
 4 files changed, 41 insertions(+), 24 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index aa913b8f3a..89705f067c 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -110,8 +110,11 @@ public class MQTTStateManager {
       for (String key : toRemove) {
          try {
             MQTTSessionState state = removeSessionState(key);
-            if (state != null && state.isWill() && !state.isAttached() && 
state.isFailed()) {
-               state.getSession().sendWillMessage();
+            if (state != null) {
+               if (state.isWill() && !state.isAttached() && state.isFailed()) {
+                  state.getSession().sendWillMessage();
+               }
+               state.getSession().clean(false);
             }
          } catch (Exception e) {
             MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index d460279bd8..2adea4926a 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -165,27 +165,15 @@ In the case of MQTT 5 clients they will receive a 
disconnect reason code of http
 
 == Automatic Subscription Clean-up
 
-Sometimes MQTT clients using `CleanSession=false` don't clean up their 
subscriptions.
-In such situations the following address-setting can be used to clean up the 
abandoned subscription queues:
+Sometimes MQTT 3.x clients using `CleanSession=false` don't properly 
unsubscribe. The URL parameter `defaultMqttSessionExpiryInterval` can be 
configured on the MQTT `acceptor` so that abandoned sessions and subscription 
queues will be cleaned up automatically after the expiry interval elapses.
 
-[,xml]
-----
-   <address-setting match="myMqttAddress">
-      <auto-delete-created-queues>true</auto-delete-created-queues>
-      <auto-delete-queues-delay>3600000</auto-delete-queues-delay> <!-- 1 hour 
delay -->
-      <auto-delete-queues-message-count>-1</auto-delete-queues-message-count> 
<!-- doesn't matter how many messages there are -->
-   </address-setting>
-----
-
-However, the MQTT session meta-data is still present in memory and needs to be 
cleaned up as well.
-The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the 
MQTT `acceptor` to deal with this situation.
-
-MQTT 5 added a new 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session
 expiry interval] property with the same basic semantics.
-The broker will use the client's value for this property if it is set.
-If it is not set then it will apply the `defaultMqttSessionExpiryInterval`.
+MQTT 5 has the same basic semantics with slightly different configuration.
+The `CleanSession` flag was replaced with `CleanStart` and a 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session
 expiry interval] property.
+The broker will use the client's session expiry interval if it is set.
+If it is not set then the broker will apply the 
`defaultMqttSessionExpiryInterval`.
 
-The default `defaultMqttSessionExpiryInterval` is `-1` which means no MQTT 3.x 
session states will be expired and no MQTT 5 session states which do not pass 
their own session expiry interval will be expired.
-Otherwise it represents the number of *seconds* which must elapse after the 
client has disconnected before the broker will remove the session state.
+The default `defaultMqttSessionExpiryInterval` is `-1` which means no clean up 
will happen for MQTT 3.x clients or for MQTT 5 clients which do not pass their 
own session expiry interval.
+Otherwise it represents the number of *seconds* which must elapse after the 
client has disconnected before the broker will remove the session state and 
subscription queues.
 
 MQTT session state is scanned every 5,000 milliseconds by default.
 This can be changed using the `mqtt-session-scan-interval` element set in the 
`core` section of `broker.xml`.
diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc
index a99ccfb1ca..c2e71637a7 100644
--- a/docs/user-manual/versions.adoc
+++ b/docs/user-manual/versions.adoc
@@ -12,6 +12,25 @@ NOTE: If the upgrade spans multiple versions then the steps 
from *each* version
 
 NOTE: Follow the general upgrade procedure outlined in the 
xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker]  chapter in 
addition to any version-specific upgrade instructions outlined here.
 
+== 2.32.0
+
+https://issues.apache.org/jira/secure/ReleaseNote.jspa...
+
+=== Highlights
+
+* highlight 1
+* highlight 2
+
+=== Upgrading from 2.31.x
+
+* Due to https://issues.apache.org/jira/browse/ARTEMIS-4501[ARTEMIS-4501] MQTT 
subscription queues will be automatically removed when the corresponding 
session expires, either based on the session expiry interval passed by an MQTT 
5 client or based on the configured `defaultMqttSessionExpiryInterval` for MQTT 
3.x clients or MQTT 5 clients which don't explicitly pass a session expiry 
interval.
++
+Prior to this change removing subscription queues relied on the generic 
`auto-delete-*` `address-settings`.
++
+These settings are now no longer required.
++
+Configure `defaultMqttSessionExpiryInterval` instead.
+
 == 2.31.2
 
 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353776[Full
 release notes]
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
index a3b6eb3c03..f423fce626 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
@@ -968,13 +968,20 @@ public class ConnectTests extends MQTT5TestSupport {
          .build();
       consumer.connect(options);
       consumer.subscribe(TOPIC, 2);
+      long start = System.currentTimeMillis();
       consumer.disconnect();
 
-      // session should *not* still exist since session expiry interval has 
passed
-      long start = System.currentTimeMillis();
+      // ensure the subscription queue still exists since the session hasn't 
expired
+      assertNotNull(getSubscriptionQueue(TOPIC, CONSUMER_ID));
+
       Wait.assertEquals(0, () -> getSessionStates().size(), EXPIRY_INTERVAL * 
1000 * 2, 100);
-      assertTrue(System.currentTimeMillis() - start > (EXPIRY_INTERVAL * 
1000));
+      assertTrue(System.currentTimeMillis() - start >= (EXPIRY_INTERVAL * 
1000));
+
+      // session should *not* still exist since session expiry interval has 
passed
       assertNull(getSessionStates().get(CONSUMER_ID));
+
+      // ensure the subscription queue is cleaned up when the session expires
+      Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) == null, 
2000, 100);
    }
 
    /*

Reply via email to