This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a1fd296b6f ARTEMIS-5783 MQTT5 shared sub not receiving sub ID
a1fd296b6f is described below
commit a1fd296b6f180b64186b776d1126503f99d59691
Author: Justin Bertram <[email protected]>
AuthorDate: Sat Nov 29 23:15:17 2025 -0600
ARTEMIS-5783 MQTT5 shared sub not receiving sub ID
---
.../core/protocol/mqtt/MQTTSessionState.java | 6 ++-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 63 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 9ec877b51f..8397abee16 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -589,7 +589,11 @@ public class MQTTSessionState {
private void update(MqttTopicSubscription newSub, Integer newId) {
if (newId != null && !newId.equals(id)) {
if (this.address == null ||
!subscription.topicFilter().equals(newSub.topicFilter())) {
- address = new
AddressImpl(SimpleString.of(newSub.topicFilter()), MQTTUtil.MQTT_WILDCARD);
+ String topicFilter = newSub.topicFilter();
+ if (MQTTUtil.isSharedSubscription(topicFilter)) {
+ topicFilter =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(newSub.topicFilter()).getB();
+ }
+ address = new AddressImpl(SimpleString.of(topicFilter),
MQTTUtil.MQTT_WILDCARD);
}
}
subscription = newSub;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 83e7092957..9ace7e9118 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -496,6 +496,69 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer.close();
}
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testSharedSubscriptionIdentifier() throws Exception {
+ final String TOPIC1 = "myTopic1";
+ final String TOPIC2 = "myTopic2";
+ final String SUB_NAME = "mySub";
+ final String[] SHARED_SUBS = new String[]{
+ MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1,
+ MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2
+ };
+ CountDownLatch ackLatch = new CountDownLatch(2);
+
+ MqttAsyncClient consumer = createAsyncPahoClient(getName());
+ consumer.connect().waitForCompletion();
+ consumer.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ if (message.getProperties().getSubscriptionIdentifier() != null) {
+ if ((topic.equals(TOPIC1) &&
message.getProperties().getSubscriptionIdentifier() == 1) ||
+ (topic.equals(TOPIC2) &&
message.getProperties().getSubscriptionIdentifier() == 2)) {
+ ackLatch.countDown();
+ }
+ }
+ }
+ });
+
+ MqttProperties subscription1Properties = new MqttProperties();
+ subscription1Properties.setSubscriptionIdentifier(1);
+ consumer.subscribe(new MqttSubscription[]{new
MqttSubscription(SHARED_SUBS[0], 1)}, null, null,
subscription1Properties).waitForCompletion();
+
+ MqttProperties subscription2Properties = new MqttProperties();
+ subscription2Properties.setSubscriptionIdentifier(2);
+ consumer.subscribe(new MqttSubscription[]{new
MqttSubscription(SHARED_SUBS[1], 1)}, null, null,
subscription2Properties).waitForCompletion();
+
+ assertNotNull(server.getAddressInfo(SimpleString.of(TOPIC1)));
+ Queue q1 = getSharedSubscriptionQueue(SHARED_SUBS[0]);
+ assertNotNull(q1);
+ assertEquals(TOPIC1, q1.getAddress().toString());
+ assertEquals(1, q1.getConsumerCount());
+
+ assertNotNull(server.getAddressInfo(SimpleString.of(TOPIC2)));
+ Queue q2 = getSharedSubscriptionQueue(SHARED_SUBS[1]);
+ assertNotNull(q2);
+ assertEquals(TOPIC2, q2.getAddress().toString());
+ assertEquals(1, q2.getConsumerCount());
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(TOPIC1, new byte[0], 1, false);
+ producer.publish(TOPIC2, new byte[0], 1, false);
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
+
+ consumer.unsubscribe(SHARED_SUBS);
+ Wait.assertNull(() -> getSharedSubscriptionQueue(SHARED_SUBS[0]), 1000,
100);
+ Wait.assertNull(() -> getSharedSubscriptionQueue(SHARED_SUBS[1]), 1000,
100);
+
+ consumer.disconnect();
+ consumer.close();
+ }
+
@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testSharedSubscriptionQueueRemoval() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact