https://issues.apache.org/jira/browse/AMQ-6002 - escape client id in virtual topic mqtt subscription
(cherry picked from commit aa743cbd7ab7815bfa84e11aee9d3783c08e9eea) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/147400c2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/147400c2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/147400c2 Branch: refs/heads/activemq-5.12.x Commit: 147400c231e4da740e844453a5c7abd653ba8b66 Parents: 9544354 Author: Dejan Bosanac <de...@nighttale.net> Authored: Wed Oct 7 11:28:19 2015 +0200 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Oct 7 16:45:40 2015 +0000 ---------------------------------------------------------------------- .../MQTTVirtualTopicSubscriptionStrategy.java | 7 +- .../activemq/transport/mqtt/PahoMQTTTest.java | 115 +++++++++++-------- 2 files changed, 69 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/147400c2/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java index 457981a..434c248 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java @@ -84,22 +84,21 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti ActiveMQDestination destination = null; int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); - + String converted = convertMQTTToActiveMQ(topicName); if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { - String converted = convertMQTTToActiveMQ(topicName); + if (converted.startsWith(VIRTUALTOPIC_PREFIX)) { destination = new ActiveMQTopic(converted); prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); } else { converted = VIRTUALTOPIC_CONSUMER_PREFIX + - protocol.getClientId() + ":" + requestedQoS + "." + + convertMQTTToActiveMQ(protocol.getClientId()) + ":" + requestedQoS + "." + VIRTUALTOPIC_PREFIX + converted; destination = new ActiveMQQueue(converted); prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; } } else { - String converted = convertMQTTToActiveMQ(topicName); if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { converted = VIRTUALTOPIC_PREFIX + converted; } http://git-wip-us.apache.org/repos/asf/activemq/blob/147400c2/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 3e149ec..2890831 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -43,6 +43,10 @@ public class PahoMQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); + protected MessageConsumer createConsumer(Session s, String topic) throws Exception { + return s.createConsumer(s.createTopic(topic)); + } + @Test(timeout = 300000) public void testLotsOfClients() throws Exception { @@ -52,7 +56,7 @@ public class PahoMQTTTest extends MQTTTestSupport { ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); activeMQConnection.start(); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + MessageConsumer consumer = createConsumer(s, "test"); final AtomicInteger receiveCounter = new AtomicInteger(); consumer.setMessageListener(new MessageListener() { @@ -118,7 +122,7 @@ public class PahoMQTTTest extends MQTTTestSupport { ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); activeMQConnection.start(); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + MessageConsumer consumer = createConsumer(s, "test"); MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence()); client.connect(); @@ -128,16 +132,11 @@ public class PahoMQTTTest extends MQTTTestSupport { assertNotNull(msg); client.disconnect(); - client.close(); } @Test(timeout = 300000) public void testSubs() throws Exception { - stopBroker(); - protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; - startBroker(); - final DefaultListener listener = new DefaultListener(); // subscriber connects and creates durable sub MqttClient client = createClient(false, "receive", listener); @@ -195,10 +194,6 @@ public class PahoMQTTTest extends MQTTTestSupport { @Test(timeout = 300000) public void testOverlappingTopics() throws Exception { - stopBroker(); - protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; - startBroker(); - final DefaultListener listener = new DefaultListener(); // subscriber connects and creates durable sub MqttClient client = createClient(false, "receive", listener); @@ -278,7 +273,7 @@ public class PahoMQTTTest extends MQTTTestSupport { public boolean isSatisified() throws Exception { return listener.result != null; } - }, TimeUnit.SECONDS.toMillis(20))); + }, TimeUnit.SECONDS.toMillis(5))); assertNull(listener.result); assertTrue(client.getPendingDeliveryTokens().length == 0); @@ -290,7 +285,7 @@ public class PahoMQTTTest extends MQTTTestSupport { public boolean isSatisified() throws Exception { return listener.result != null; } - }, TimeUnit.SECONDS.toMillis(20))); + }, TimeUnit.SECONDS.toMillis(5))); assertNull(listener.result); assertTrue(client.getPendingDeliveryTokens().length == 0); } @@ -354,53 +349,75 @@ public class PahoMQTTTest extends MQTTTestSupport { } @Test(timeout = 300000) - public void testVirtualTopicQueueRestore() throws Exception { - - stopBroker(); - protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; - startBroker(); - - String user10 = "user10"; - String password10 = "user10"; - String clientId10 = "client-10"; - String topic10 = "user10/"; - MqttConnectOptions options10 = new MqttConnectOptions(); - options10.setCleanSession(false); - options10.setUserName(user10); - options10.setPassword(password10.toCharArray()); - MqttClient client10 = createClient(false, clientId10, null); - client10.subscribe(topic10 + clientId10 + "/#", 1); - client10.subscribe(topic10 + "#", 1); - - String user1 = "user1"; - String password1 = "user1"; - String clientId1 = "client-1"; - String topic1 = "user1/"; + public void testClientIdSpecialChars() throws Exception { + testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1); + testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1); + } + + protected void testClientId(String clientId, int mqttVersion, final DefaultListener clientAdminMqttCallback) throws Exception { MqttConnectOptions options1 = new MqttConnectOptions(); options1.setCleanSession(false); - options1.setUserName(user1); - options1.setPassword(password1.toCharArray()); + options1.setUserName("client1"); + options1.setPassword("client1".toCharArray()); + options1.setMqttVersion(mqttVersion); + final DefaultListener client1MqttCallback = new DefaultListener(); + MqttClient client1 = createClient(options1, clientId, client1MqttCallback); + client1.setCallback(client1MqttCallback); - MqttClient client1 = createClient(false, clientId1, null); - client1.subscribe(topic1 + clientId1 + "/#", 1); - client1.subscribe(topic1 + "#", 1); + String topic = "client1/" + clientId + "/topic"; + client1.subscribe(topic, 1); - RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); + String message = "Message from client: " + clientId; + client1.publish(topic, message.getBytes(), 1, false); - String[] queues = new String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>", - "Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"}; - for (String queueName : queues) { - Destination queue = regionBroker.getQueueRegion().getDestinations(new ActiveMQQueue(queueName)).iterator().next(); - assertEquals("Queue " + queueName + " have more than one consumer", 1, queue.getConsumers().size()); - } + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return client1MqttCallback.result != null; + } + }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); + assertEquals(message, client1MqttCallback.result); + assertEquals(1, client1MqttCallback.received); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return clientAdminMqttCallback.result != null; + } + }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); + assertEquals(message, clientAdminMqttCallback.result); + + assertTrue(client1.isConnected()); + client1.disconnect(); + } + + protected void testClientIdSpecialChars(int mqttVersion) throws Exception { + + LOG.info("Test MQTT version {}", mqttVersion); + MqttConnectOptions optionsAdmin = new MqttConnectOptions(); + optionsAdmin.setCleanSession(false); + optionsAdmin.setUserName("admin"); + optionsAdmin.setPassword("admin".toCharArray()); + + DefaultListener clientAdminMqttCallback = new DefaultListener(); + MqttClient clientAdmin = createClient(optionsAdmin, "admin", clientAdminMqttCallback); + clientAdmin.subscribe("#", 1); + + testClientId(":%&&@.:llll", mqttVersion, clientAdminMqttCallback); + testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); + testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback); + testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); } + protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(cleanSession); + return createClient(options, clientId, listener); + } + + protected MqttClient createClient(MqttConnectOptions options, String clientId, MqttCallback listener) throws Exception { final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); client.setCallback(listener); client.connect(options);