This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 22e3b09b9c ARTEMIS-4370 update existing topic alias for MQTT 5 
publisher
22e3b09b9c is described below

commit 22e3b09b9c2ad8c1c0da6e6bc97f4f0bf5e28af3
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Fri Jul 21 13:05:25 2023 -0500

    ARTEMIS-4370 update existing topic alias for MQTT 5 publisher
---
 .../core/protocol/mqtt/MQTTPublishManager.java     | 28 ++++++----
 .../core/protocol/mqtt/MQTTSessionState.java       |  2 +-
 .../mqtt5/spec/controlpackets/PublishTests.java    | 65 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 13 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 57b9605d21..057e2a957b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -189,27 +189,31 @@ public class MQTTPublishManager {
          String topic = message.variableHeader().topicName();
          if (session.getVersion() == MQTTVersion.MQTT_5) {
             Integer alias = MQTTUtil.getProperty(Integer.class, 
message.variableHeader().properties(), TOPIC_ALIAS);
-            Integer topicAliasMax = 
session.getProtocolManager().getTopicAliasMaximum();
             if (alias != null) {
+               Integer topicAliasMax = 
session.getProtocolManager().getTopicAliasMaximum();
                if (alias == 0) {
                   // [MQTT-3.3.2-8]
                   throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
                } else if (topicAliasMax != null && alias > topicAliasMax) {
                   // [MQTT-3.3.2-9]
                   throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
-               } else {
-                  topic = session.getState().getClientTopicAlias(alias);
-                  if (topic == null) {
-                     topic = message.variableHeader().topicName();
-                     if (topic == null || topic.length() == 0) {
-                        // using a topic alias with no matching topic in the 
state; potentially [MQTT-3.3.2-7]
-                        throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
-                     }
-                     session.getState().addClientTopicAlias(alias, topic);
+               }
+
+               String existingTopicMapping = 
session.getState().getClientTopicAlias(alias);
+               if (existingTopicMapping == null) {
+                  if (topic == null || topic.length() == 0) {
+                     // using a topic alias with no matching topic in the 
state; potentially [MQTT-3.3.2-7]
+                     throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
                   }
+                  logger.debug("Adding new alias {} for topic {}", alias, 
topic);
+                  session.getState().putClientTopicAlias(alias, topic);
+               } else if (topic != null && topic.length() > 0) {
+                  logger.debug("Modifying existing alias {}. New value: {}; 
old value: {}", alias, topic, existingTopicMapping);
+                  session.getState().putClientTopicAlias(alias, topic);
+               } else {
+                  logger.debug("Applying topic {} for alias {}", 
existingTopicMapping, alias);
+                  topic = existingTopicMapping;
                }
-            } else {
-               topic = message.variableHeader().topicName();
             }
          }
          String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, 
session.getWildcardConfiguration());
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 23209c7f14..33e6f159ed 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -307,7 +307,7 @@ public class MQTTSessionState {
       this.clientMaxPacketSize = clientMaxPacketSize;
    }
 
-   public void addClientTopicAlias(Integer alias, String topicName) {
+   public void putClientTopicAlias(Integer alias, String topicName) {
       if (clientTopicAliases == null) {
          clientTopicAliases = new HashMap<>();
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 84ad469961..de0b6a7abf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1020,6 +1020,71 @@ public class PublishTests extends MQTT5TestSupport {
       consumer.disconnect();
    }
 
+   /*
+    * From section 3.3.2.3.4 of the MQTT 5 specification:
+    *
+    * A sender can modify the Topic Alias mapping by sending another PUBLISH 
in the same Network Connection with the
+    * same Topic Alias value and a different non-zero length Topic Name.
+    */
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testModifiedTopicAlias() throws Exception {
+      final String TOPIC_1 = this.getTopicName() + "1";
+      final String TOPIC_2 = this.getTopicName() + "2";
+
+      MqttClient consumer1 = createPahoClient("consumer1");
+      CountDownLatch latch1 = new CountDownLatch(1);
+      consumer1.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            String payload = new String(message.getPayload());
+            if (payload.equals("first")) {
+               latch1.countDown();
+            }
+         }
+      });
+      consumer1.connect();
+      consumer1.subscribe(TOPIC_1, 1);
+
+      MqttClient consumer2 = createPahoClient("consumer2");
+      CountDownLatch latch2 = new CountDownLatch(1);
+      consumer2.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            String payload = new String(message.getPayload());
+            if (payload.equals("second")) {
+               latch2.countDown();
+            }
+         }
+      });
+      consumer2.connect();
+      consumer2.subscribe(TOPIC_2, 1);
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+
+      MqttProperties properties = new MqttProperties();
+      properties.setTopicAlias(1);
+      MqttMessage m = new MqttMessage();
+      m.setProperties(properties);
+      m.setQos(1);
+      m.setRetained(false);
+      m.setPayload("first".getBytes(StandardCharsets.UTF_8));
+      producer.publish(TOPIC_1, m);
+      m.setPayload("second".getBytes(StandardCharsets.UTF_8));
+      producer.publish(TOPIC_2, m);
+
+      producer.disconnect();
+      producer.close();
+
+      assertTrue(latch1.await(2, TimeUnit.SECONDS));
+      assertTrue(latch2.await(2, TimeUnit.SECONDS));
+
+      consumer1.disconnect();
+      consumer1.close();
+      consumer2.disconnect();
+      consumer2.close();
+   }
+
    /*
     * [MQTT-3.3.2-15] The Server MUST send the Response Topic unaltered to all 
subscribers receiving the Application
     * Message.

Reply via email to