This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 96fa98fc93 ARTEMIS-3789 respect session expiry interval on MQTT disconnect message new 98eb31a2d7 This closes #4037 96fa98fc93 is described below commit 96fa98fc93c0769b132784df539ad3eb6ce81a78 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Wed Apr 20 12:33:51 2022 -0500 ARTEMIS-3789 respect session expiry interval on MQTT disconnect message --- .../core/protocol/mqtt/MQTTProtocolHandler.java | 13 +++++++++++- .../artemis/tests/integration/mqtt5/MQTT5Test.java | 23 +++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 26535d9b7d..67a3cce4c1 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -51,6 +51,7 @@ import org.jboss.logging.Logger; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the @@ -173,7 +174,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { handleUnsubscribe((MqttUnsubscribeMessage) message); break; case DISCONNECT: - disconnect(false); + disconnect(false, message); break; case UNSUBACK: case SUBACK: @@ -280,6 +281,16 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void disconnect(boolean error) { + disconnect(error, null); + } + + void disconnect(boolean error, MqttMessage disconnect) { + if (disconnect != null && disconnect.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) { + Integer sessionExpiryInterval = MQTTUtil.getProperty(Integer.class, ((MqttReasonCodeAndPropertiesVariableHeader)disconnect.variableHeader()).properties(), SESSION_EXPIRY_INTERVAL, null); + if (sessionExpiryInterval != null) { + session.getState().setClientSessionExpiryInterval(sessionExpiryInterval); + } + } session.getConnectionManager().disconnect(error); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 1a4d398158..f7f3085bf7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -20,16 +20,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5; import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.Message; - import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.Wait; +import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; @@ -164,4 +165,24 @@ public class MQTT5Test extends MQTT5TestSupport { client2.disconnectForcibly(0, 0, false); assertTrue(latch.await(2, TimeUnit.SECONDS)); } + + /* + * It's possible for a client to change their session expiry interval via the DISCONNECT packet. Ensure we respect + * a new session expiry interval when disconnecting. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testExpiryDelayOnDisconnect() throws Exception { + final String CONSUMER_ID = RandomUtil.randomString(); + + MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID); + MqttConnectionOptions options = new MqttConnectionOptionsBuilder() + .sessionExpiryInterval(300L) + .build(); + consumer.connect(options).waitForCompletion(); + MqttProperties disconnectProperties = new MqttProperties(); + disconnectProperties.setSessionExpiryInterval(0L); + consumer.disconnect(0, null, null, MQTTReasonCodes.SUCCESS, disconnectProperties).waitForCompletion(); + + Wait.assertEquals(0, () -> getSessionStates().size(), 5000, 10); + } }