Repository: activemq Updated Branches: refs/heads/master 4a821186a -> a4fbe7087
https://issues.apache.org/jira/browse/AMQ-5734 Support MQTT 3.1 silent subscription fail Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a4fbe708 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a4fbe708 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a4fbe708 Branch: refs/heads/master Commit: a4fbe708726b4846fa9831e083f0fbf554b4f324 Parents: 4a82118 Author: Dejan Bosanac <[email protected]> Authored: Mon Apr 20 18:15:20 2015 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Mon Apr 20 18:17:09 2015 +0200 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 6 +++++ .../AbstractMQTTSubscriptionStrategy.java | 8 ++++++- .../activemq/transport/mqtt/MQTTAuthTest.java | 24 +++++++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 4e0b0df..37c0c4c 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -87,6 +87,8 @@ public class MQTTProtocolConverter { private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; + public static final int V3_1 = 3; + public static final int V3_1_1 = 4; private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); @@ -119,6 +121,8 @@ public class MQTTProtocolConverter { private final MQTTPacketIdGenerator packetIdGenerator; private boolean publishDollarTopics; + public int version; + private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); /* @@ -246,6 +250,8 @@ public class MQTTProtocolConverter { passswd = connect.password().toString(); } + version = connect.version(); + configureInactivityMonitor(connect.keepAlive()); connectionInfo.setConnectionId(connectionId); http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java index 121b829..988a065 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java @@ -204,7 +204,13 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti if (response.isException()) { final Throwable throwable = ((ExceptionResponse) response).getException(); LOG.warn("Error subscribing to {}", topicName, throwable); - qos[0] = SUBSCRIBE_ERROR; + // version 3.1 don't supports silent fail + // version 3.1.1 send "error" qos + if (protocol.version == protocol.V3_1_1) { + qos[0] = SUBSCRIBE_ERROR; + } else { + qos[0] = (byte) qoS.ordinal(); + } } else { qos[0] = (byte) qoS.ordinal(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/a4fbe708/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java index a8ced02..07a8cc9 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java @@ -138,8 +138,9 @@ public class MQTTAuthTest extends MQTTAuthTestSupport { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); mqtt.setKeepAlive((short) 2); + mqtt.setVersion("3.1.1"); - final BlockingConnection connection = mqtt.blockingConnection(); + BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); final String NAMED = "named"; @@ -163,7 +164,28 @@ public class MQTTAuthTest extends MQTTAuthTestSupport { assertEquals(ANONYMOUS, new String(msg.getPayload())); msg.ack(); + //delete retained message + connection.publish(ANONYMOUS, "".getBytes(), QoS.AT_MOST_ONCE, true); + connection.disconnect(); + + // Test 3.1 functionality + mqtt.setVersion("3.1"); + connection = mqtt.blockingConnection(); + connection.connect(); + qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE) }); + assertEquals(QoS.AT_MOST_ONCE.ordinal(), qos[0]); + + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + connectionPub.connect(); + connectionPub.publish(NAMED, NAMED.getBytes(), QoS.AT_MOST_ONCE, true); + + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNull(msg); } @Test(timeout = 60 * 1000)
