This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 96fa98fc93 ARTEMIS-3789 respect session expiry interval on MQTT 
disconnect message
     new 98eb31a2d7 This closes #4037
96fa98fc93 is described below

commit 96fa98fc93c0769b132784df539ad3eb6ce81a78
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Wed Apr 20 12:33:51 2022 -0500

    ARTEMIS-3789 respect session expiry interval on MQTT disconnect message
---
 .../core/protocol/mqtt/MQTTProtocolHandler.java    | 13 +++++++++++-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 23 +++++++++++++++++++++-
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 26535d9b7d..67a3cce4c1 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -51,6 +51,7 @@ import org.jboss.logging.Logger;
 
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA;
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
 
 /**
  * This class is responsible for receiving and sending MQTT packets, 
delegating behaviour to one of the
@@ -173,7 +174,7 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
                handleUnsubscribe((MqttUnsubscribeMessage) message);
                break;
             case DISCONNECT:
-               disconnect(false);
+               disconnect(false, message);
                break;
             case UNSUBACK:
             case SUBACK:
@@ -280,6 +281,16 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
    }
 
    void disconnect(boolean error) {
+      disconnect(error, null);
+   }
+
+   void disconnect(boolean error, MqttMessage disconnect) {
+      if (disconnect != null && disconnect.variableHeader() instanceof 
MqttReasonCodeAndPropertiesVariableHeader) {
+         Integer sessionExpiryInterval = MQTTUtil.getProperty(Integer.class, 
((MqttReasonCodeAndPropertiesVariableHeader)disconnect.variableHeader()).properties(),
 SESSION_EXPIRY_INTERVAL, null);
+         if (sessionExpiryInterval != null) {
+            
session.getState().setClientSessionExpiryInterval(sessionExpiryInterval);
+         }
+      }
       session.getConnectionManager().disconnect(error);
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 1a4d398158..f7f3085bf7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -20,16 +20,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5;
 import javax.jms.JMSConsumer;
 import javax.jms.JMSContext;
 import javax.jms.Message;
-
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.utils.Wait;
+import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
@@ -164,4 +165,24 @@ public class MQTT5Test extends MQTT5TestSupport {
       client2.disconnectForcibly(0, 0, false);
       assertTrue(latch.await(2, TimeUnit.SECONDS));
    }
+
+   /*
+    * It's possible for a client to change their session expiry interval via 
the DISCONNECT packet. Ensure we respect
+    * a new session expiry interval when disconnecting.
+    */
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testExpiryDelayOnDisconnect() throws Exception {
+      final String CONSUMER_ID = RandomUtil.randomString();
+
+      MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
+      MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+         .sessionExpiryInterval(300L)
+         .build();
+      consumer.connect(options).waitForCompletion();
+      MqttProperties disconnectProperties = new MqttProperties();
+      disconnectProperties.setSessionExpiryInterval(0L);
+      consumer.disconnect(0, null, null, MQTTReasonCodes.SUCCESS, 
disconnectProperties).waitForCompletion();
+
+      Wait.assertEquals(0, () -> getSessionStates().size(), 5000, 10);
+   }
 }

Reply via email to