Repository: activemq Updated Branches: refs/heads/trunk 0db7e69b4 -> 7e56f348b
https://issues.apache.org/jira/browse/AMQ-5066 apply patch after review Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7e56f348 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7e56f348 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7e56f348 Branch: refs/heads/trunk Commit: 7e56f348bc92488700b8e4484cd9260b1de1b5d8 Parents: 0db7e69 Author: Timothy Bish <[email protected]> Authored: Wed Feb 19 14:12:49 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Feb 19 14:12:49 2014 -0500 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 35 +++++++----- .../transport/mqtt/MQTTSubscription.java | 4 ++ .../activemq/transport/mqtt/MQTTTest.java | 56 +++++++++++++++++++- 3 files changed, 80 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index c270566..19614a9 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -366,24 +366,33 @@ public class MQTTProtocolConverter { } QoS onSubscribe(Topic topic) throws MQTTProtocolException { - if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) { - ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); - ConsumerInfo consumerInfo = new ConsumerInfo(id); - consumerInfo.setDestination(destination); - consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); - consumerInfo.setDispatchAsync(true); - if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { - consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); + if( mqttSubscriptionByTopic.containsKey(topic.name())) { + if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) { + // remove old subscription as the QoS has changed + onUnSubscribe(topic.name()); + } else { + // duplicate SUBSCRIBE packet, nothing to do + return topic.qos(); } - MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); + } - subscriptionsByConsumerId.put(id, mqttSubscription); - mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - sendToActiveMQ(consumerInfo, null); + ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + ConsumerInfo consumerInfo = new ConsumerInfo(id); + consumerInfo.setDestination(destination); + consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); + consumerInfo.setDispatchAsync(true); + if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { + consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); } + MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); + + subscriptionsByConsumerId.put(id, mqttSubscription); + mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); + + sendToActiveMQ(consumerInfo, null); return topic.qos(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java index 20510ee..99af92a 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -67,4 +67,8 @@ class MQTTSubscription { public ConsumerInfo getConsumerInfo() { return consumerInfo; } + + public QoS qos() { + return qos; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 73397de..3e1e6cb 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -301,7 +301,7 @@ public class MQTTTest extends AbstractMQTTTest { initializeConnection(subscriber); String RETAINED = "retained"; - publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true); + publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true); List<String> messages = new ArrayList<String>(); for (int i = 0; i < 10; i++){ @@ -320,7 +320,7 @@ public class MQTTTest extends AbstractMQTTTest { for (int i =0; i < 10; i++){ msg = subscriber.receive(5000); assertNotNull(msg); - assertEquals(messages.get(i),new String(msg)); + assertEquals(messages.get(i), new String(msg)); } subscriber.disconnect(); publisher.disconnect(); @@ -377,6 +377,58 @@ public class MQTTTest extends AbstractMQTTTest { } + @Test(timeout = 60 * 1000) + public void testDuplicateSubscriptions() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short)2); + + final int[] actualQoS = {-1}; + mqtt.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + // validate the QoS + if (frame.messageType() == PUBLISH.TYPE) { + PUBLISH publish = new PUBLISH(); + try { + publish.decode(frame); + } catch (ProtocolException e) { + fail("Failed decoding " + e.getMessage()); + } + actualQoS[0] = publish.qos().ordinal(); + } + } + }); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String RETAIN = "RETAIN"; + connection.publish("TopicA", RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); + + QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE }; + for (QoS qos : qoss) { + connection.subscribe(new Topic[]{ new Topic("TopicA", qos) }); + + final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(RETAIN, new String(msg.getPayload())); + int waitCount = 0; + while (actualQoS[0] == -1 && waitCount < 10) { + Thread.sleep(1000); + waitCount++; + } + assertEquals(qos.ordinal(), actualQoS[0]); + } + + connection.unsubscribe(new String[]{"TopicA"}); + connection.disconnect(); + + } + @Test(timeout=60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { addMQTTConnector();
