This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a1fd296b6f ARTEMIS-5783 MQTT5 shared sub not receiving sub ID
a1fd296b6f is described below

commit a1fd296b6f180b64186b776d1126503f99d59691
Author: Justin Bertram <[email protected]>
AuthorDate: Sat Nov 29 23:15:17 2025 -0600

    ARTEMIS-5783 MQTT5 shared sub not receiving sub ID
---
 .../core/protocol/mqtt/MQTTSessionState.java       |  6 ++-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 63 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 9ec877b51f..8397abee16 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -589,7 +589,11 @@ public class MQTTSessionState {
       private void update(MqttTopicSubscription newSub, Integer newId) {
          if (newId != null && !newId.equals(id)) {
             if (this.address == null || 
!subscription.topicFilter().equals(newSub.topicFilter())) {
-               address = new 
AddressImpl(SimpleString.of(newSub.topicFilter()), MQTTUtil.MQTT_WILDCARD);
+               String topicFilter = newSub.topicFilter();
+               if (MQTTUtil.isSharedSubscription(topicFilter)) {
+                  topicFilter = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(newSub.topicFilter()).getB();
+               }
+               address = new AddressImpl(SimpleString.of(topicFilter), 
MQTTUtil.MQTT_WILDCARD);
             }
          }
          subscription = newSub;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 83e7092957..9ace7e9118 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -496,6 +496,69 @@ public class MQTT5Test extends MQTT5TestSupport {
       consumer.close();
    }
 
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testSharedSubscriptionIdentifier() throws Exception {
+      final String TOPIC1 = "myTopic1";
+      final String TOPIC2 = "myTopic2";
+      final String SUB_NAME = "mySub";
+      final String[] SHARED_SUBS = new String[]{
+         MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1,
+         MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2
+      };
+      CountDownLatch ackLatch = new CountDownLatch(2);
+
+      MqttAsyncClient consumer = createAsyncPahoClient(getName());
+      consumer.connect().waitForCompletion();
+      consumer.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (message.getProperties().getSubscriptionIdentifier() != null) {
+               if ((topic.equals(TOPIC1) && 
message.getProperties().getSubscriptionIdentifier() == 1) ||
+                  (topic.equals(TOPIC2) && 
message.getProperties().getSubscriptionIdentifier() == 2)) {
+                  ackLatch.countDown();
+               }
+            }
+         }
+      });
+
+      MqttProperties subscription1Properties = new MqttProperties();
+      subscription1Properties.setSubscriptionIdentifier(1);
+      consumer.subscribe(new MqttSubscription[]{new 
MqttSubscription(SHARED_SUBS[0], 1)}, null, null, 
subscription1Properties).waitForCompletion();
+
+      MqttProperties subscription2Properties = new MqttProperties();
+      subscription2Properties.setSubscriptionIdentifier(2);
+      consumer.subscribe(new MqttSubscription[]{new 
MqttSubscription(SHARED_SUBS[1], 1)}, null, null, 
subscription2Properties).waitForCompletion();
+
+      assertNotNull(server.getAddressInfo(SimpleString.of(TOPIC1)));
+      Queue q1 = getSharedSubscriptionQueue(SHARED_SUBS[0]);
+      assertNotNull(q1);
+      assertEquals(TOPIC1, q1.getAddress().toString());
+      assertEquals(1, q1.getConsumerCount());
+
+      assertNotNull(server.getAddressInfo(SimpleString.of(TOPIC2)));
+      Queue q2 = getSharedSubscriptionQueue(SHARED_SUBS[1]);
+      assertNotNull(q2);
+      assertEquals(TOPIC2, q2.getAddress().toString());
+      assertEquals(1, q2.getConsumerCount());
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(TOPIC1, new byte[0], 1, false);
+      producer.publish(TOPIC2, new byte[0], 1, false);
+      producer.disconnect();
+      producer.close();
+
+      assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
+
+      consumer.unsubscribe(SHARED_SUBS);
+      Wait.assertNull(() -> getSharedSubscriptionQueue(SHARED_SUBS[0]), 1000, 
100);
+      Wait.assertNull(() -> getSharedSubscriptionQueue(SHARED_SUBS[1]), 1000, 
100);
+
+      consumer.disconnect();
+      consumer.close();
+   }
+
    @Test
    @Timeout(DEFAULT_TIMEOUT_SEC)
    public void testSharedSubscriptionQueueRemoval() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to