This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 8e68bb1902 ARTEMIS-4501 clean up MQTT subscription queues when session expires 8e68bb1902 is described below commit 8e68bb1902f0044db36f0c4b99acc74495039585 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Mon Nov 13 12:02:46 2023 -0600 ARTEMIS-4501 clean up MQTT subscription queues when session expires --- .../core/protocol/mqtt/MQTTStateManager.java | 7 ++++-- docs/user-manual/mqtt.adoc | 26 ++++++---------------- docs/user-manual/versions.adoc | 19 ++++++++++++++++ .../mqtt5/spec/controlpackets/ConnectTests.java | 13 ++++++++--- 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java index aa913b8f3a..89705f067c 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java @@ -110,8 +110,11 @@ public class MQTTStateManager { for (String key : toRemove) { try { MQTTSessionState state = removeSessionState(key); - if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { - state.getSession().sendWillMessage(); + if (state != null) { + if (state.isWill() && !state.isAttached() && state.isFailed()) { + state.getSession().sendWillMessage(); + } + state.getSession().clean(false); } } catch (Exception e) { MQTTLogger.LOGGER.failedToRemoveSessionState(key, e); diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc index d460279bd8..2adea4926a 100644 --- a/docs/user-manual/mqtt.adoc +++ b/docs/user-manual/mqtt.adoc @@ -165,27 +165,15 @@ In the case of MQTT 5 clients they will receive a disconnect reason code of http == Automatic Subscription Clean-up -Sometimes MQTT clients using `CleanSession=false` don't clean up their subscriptions. -In such situations the following address-setting can be used to clean up the abandoned subscription queues: +Sometimes MQTT 3.x clients using `CleanSession=false` don't properly unsubscribe. The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` so that abandoned sessions and subscription queues will be cleaned up automatically after the expiry interval elapses. -[,xml] ----- - <address-setting match="myMqttAddress"> - <auto-delete-created-queues>true</auto-delete-created-queues> - <auto-delete-queues-delay>3600000</auto-delete-queues-delay> <!-- 1 hour delay --> - <auto-delete-queues-message-count>-1</auto-delete-queues-message-count> <!-- doesn't matter how many messages there are --> - </address-setting> ----- - -However, the MQTT session meta-data is still present in memory and needs to be cleaned up as well. -The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` to deal with this situation. - -MQTT 5 added a new https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property with the same basic semantics. -The broker will use the client's value for this property if it is set. -If it is not set then it will apply the `defaultMqttSessionExpiryInterval`. +MQTT 5 has the same basic semantics with slightly different configuration. +The `CleanSession` flag was replaced with `CleanStart` and a https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property. +The broker will use the client's session expiry interval if it is set. +If it is not set then the broker will apply the `defaultMqttSessionExpiryInterval`. -The default `defaultMqttSessionExpiryInterval` is `-1` which means no MQTT 3.x session states will be expired and no MQTT 5 session states which do not pass their own session expiry interval will be expired. -Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state. +The default `defaultMqttSessionExpiryInterval` is `-1` which means no clean up will happen for MQTT 3.x clients or for MQTT 5 clients which do not pass their own session expiry interval. +Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state and subscription queues. MQTT session state is scanned every 5,000 milliseconds by default. This can be changed using the `mqtt-session-scan-interval` element set in the `core` section of `broker.xml`. diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc index a99ccfb1ca..c2e71637a7 100644 --- a/docs/user-manual/versions.adoc +++ b/docs/user-manual/versions.adoc @@ -12,6 +12,25 @@ NOTE: If the upgrade spans multiple versions then the steps from *each* version NOTE: Follow the general upgrade procedure outlined in the xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in addition to any version-specific upgrade instructions outlined here. +== 2.32.0 + +https://issues.apache.org/jira/secure/ReleaseNote.jspa... + +=== Highlights + +* highlight 1 +* highlight 2 + +=== Upgrading from 2.31.x + +* Due to https://issues.apache.org/jira/browse/ARTEMIS-4501[ARTEMIS-4501] MQTT subscription queues will be automatically removed when the corresponding session expires, either based on the session expiry interval passed by an MQTT 5 client or based on the configured `defaultMqttSessionExpiryInterval` for MQTT 3.x clients or MQTT 5 clients which don't explicitly pass a session expiry interval. ++ +Prior to this change removing subscription queues relied on the generic `auto-delete-*` `address-settings`. ++ +These settings are now no longer required. ++ +Configure `defaultMqttSessionExpiryInterval` instead. + == 2.31.2 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353776[Full release notes] diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java index a3b6eb3c03..f423fce626 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java @@ -968,13 +968,20 @@ public class ConnectTests extends MQTT5TestSupport { .build(); consumer.connect(options); consumer.subscribe(TOPIC, 2); + long start = System.currentTimeMillis(); consumer.disconnect(); - // session should *not* still exist since session expiry interval has passed - long start = System.currentTimeMillis(); + // ensure the subscription queue still exists since the session hasn't expired + assertNotNull(getSubscriptionQueue(TOPIC, CONSUMER_ID)); + Wait.assertEquals(0, () -> getSessionStates().size(), EXPIRY_INTERVAL * 1000 * 2, 100); - assertTrue(System.currentTimeMillis() - start > (EXPIRY_INTERVAL * 1000)); + assertTrue(System.currentTimeMillis() - start >= (EXPIRY_INTERVAL * 1000)); + + // session should *not* still exist since session expiry interval has passed assertNull(getSessionStates().get(CONSUMER_ID)); + + // ensure the subscription queue is cleaned up when the session expires + Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) == null, 2000, 100); } /*