This is an automated email from the ASF dual-hosted git repository. jbertram pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 22e3b09b9c ARTEMIS-4370 update existing topic alias for MQTT 5 publisher 22e3b09b9c is described below commit 22e3b09b9c2ad8c1c0da6e6bc97f4f0bf5e28af3 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Fri Jul 21 13:05:25 2023 -0500 ARTEMIS-4370 update existing topic alias for MQTT 5 publisher --- .../core/protocol/mqtt/MQTTPublishManager.java | 28 ++++++---- .../core/protocol/mqtt/MQTTSessionState.java | 2 +- .../mqtt5/spec/controlpackets/PublishTests.java | 65 ++++++++++++++++++++++ 3 files changed, 82 insertions(+), 13 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 57b9605d21..057e2a957b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -189,27 +189,31 @@ public class MQTTPublishManager { String topic = message.variableHeader().topicName(); if (session.getVersion() == MQTTVersion.MQTT_5) { Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS); - Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum(); if (alias != null) { + Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum(); if (alias == 0) { // [MQTT-3.3.2-8] throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID); } else if (topicAliasMax != null && alias > topicAliasMax) { // [MQTT-3.3.2-9] throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID); - } else { - topic = session.getState().getClientTopicAlias(alias); - if (topic == null) { - topic = message.variableHeader().topicName(); - if (topic == null || topic.length() == 0) { - // using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7] - throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID); - } - session.getState().addClientTopicAlias(alias, topic); + } + + String existingTopicMapping = session.getState().getClientTopicAlias(alias); + if (existingTopicMapping == null) { + if (topic == null || topic.length() == 0) { + // using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7] + throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID); } + logger.debug("Adding new alias {} for topic {}", alias, topic); + session.getState().putClientTopicAlias(alias, topic); + } else if (topic != null && topic.length() > 0) { + logger.debug("Modifying existing alias {}. New value: {}; old value: {}", alias, topic, existingTopicMapping); + session.getState().putClientTopicAlias(alias, topic); + } else { + logger.debug("Applying topic {} for alias {}", existingTopicMapping, alias); + topic = existingTopicMapping; } - } else { - topic = message.variableHeader().topicName(); } } String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration()); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 23209c7f14..33e6f159ed 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -307,7 +307,7 @@ public class MQTTSessionState { this.clientMaxPacketSize = clientMaxPacketSize; } - public void addClientTopicAlias(Integer alias, String topicName) { + public void putClientTopicAlias(Integer alias, String topicName) { if (clientTopicAliases == null) { clientTopicAliases = new HashMap<>(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java index 84ad469961..de0b6a7abf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java @@ -1020,6 +1020,71 @@ public class PublishTests extends MQTT5TestSupport { consumer.disconnect(); } + /* + * From section 3.3.2.3.4 of the MQTT 5 specification: + * + * A sender can modify the Topic Alias mapping by sending another PUBLISH in the same Network Connection with the + * same Topic Alias value and a different non-zero length Topic Name. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testModifiedTopicAlias() throws Exception { + final String TOPIC_1 = this.getTopicName() + "1"; + final String TOPIC_2 = this.getTopicName() + "2"; + + MqttClient consumer1 = createPahoClient("consumer1"); + CountDownLatch latch1 = new CountDownLatch(1); + consumer1.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + if (payload.equals("first")) { + latch1.countDown(); + } + } + }); + consumer1.connect(); + consumer1.subscribe(TOPIC_1, 1); + + MqttClient consumer2 = createPahoClient("consumer2"); + CountDownLatch latch2 = new CountDownLatch(1); + consumer2.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + if (payload.equals("second")) { + latch2.countDown(); + } + } + }); + consumer2.connect(); + consumer2.subscribe(TOPIC_2, 1); + + MqttClient producer = createPahoClient("producer"); + producer.connect(); + + MqttProperties properties = new MqttProperties(); + properties.setTopicAlias(1); + MqttMessage m = new MqttMessage(); + m.setProperties(properties); + m.setQos(1); + m.setRetained(false); + m.setPayload("first".getBytes(StandardCharsets.UTF_8)); + producer.publish(TOPIC_1, m); + m.setPayload("second".getBytes(StandardCharsets.UTF_8)); + producer.publish(TOPIC_2, m); + + producer.disconnect(); + producer.close(); + + assertTrue(latch1.await(2, TimeUnit.SECONDS)); + assertTrue(latch2.await(2, TimeUnit.SECONDS)); + + consumer1.disconnect(); + consumer1.close(); + consumer2.disconnect(); + consumer2.close(); + } + /* * [MQTT-3.3.2-15] The Server MUST send the Response Topic unaltered to all subscribers receiving the Application * Message.