ARTEMIS-990 Dont require Perms on MQTT mngment Q (cherry picked from commit b33fea0d7fbc94a43d04ca66a89880442e0f91c5)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2779ad85 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2779ad85 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2779ad85 Branch: refs/heads/1.x Commit: 2779ad85539be3292b38a683eadb01837bf1024e Parents: e0cd9aa Author: Martyn Taylor <[email protected]> Authored: Fri Mar 10 10:14:19 2017 +0000 Committer: Martyn Taylor <[email protected]> Committed: Fri Mar 10 13:13:31 2017 +0000 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTPublishManager.java | 23 ++++++++++++-------- .../protocol/mqtt/MQTTRetainMessageManager.java | 2 +- .../integration/mqtt/imported/MQTTTest.java | 4 ++-- 3 files changed, 17 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 77b45ab..626916f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -62,10 +62,6 @@ public class MQTTPublishManager { synchronized void start() throws Exception { this.state = session.getSessionState(); this.outboundStore = state.getOutboundStore(); - - createManagementAddress(); - createManagementQueue(); - createManagementConsumer(); } synchronized void stop() throws Exception { @@ -77,7 +73,7 @@ public class MQTTPublishManager { } void clean() throws Exception { - createManagementAddress(); + SimpleString managementAddress = createManagementAddress(); Queue queue = session.getServer().locateQueue(managementAddress); if (queue != null) { queue.deleteQueue(); @@ -90,14 +86,14 @@ public class MQTTPublishManager { managementConsumer.setStarted(true); } - private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); + private SimpleString createManagementAddress() { + return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); } private void createManagementQueue() throws Exception { Queue q = session.getServer().locateQueue(managementAddress); if (q == null) { - session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES); + session.getServer().createQueue(managementAddress, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false); } } @@ -183,11 +179,20 @@ public class MQTTPublishManager { session.getProtocolHandler().sendPubRel(messageId); } + private SimpleString getManagementAddress() throws Exception { + if (managementAddress == null) { + managementAddress = createManagementAddress(); + createManagementQueue(); + createManagementConsumer(); + } + return managementAddress; + } + void handlePubRec(int messageId) throws Exception { try { Pair<Long, Long> ref = outboundStore.publishReceived(messageId); if (ref != null) { - ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + ServerMessage m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId); session.getServerSession().send(m, true); session.getServerSession().acknowledge(ref.getB(), ref.getA()); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 70db040..7acc3b4 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -49,7 +49,7 @@ public class MQTTRetainMessageManager { Queue queue = session.getServer().locateQueue(retainAddress); if (queue == null) { - queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true); + queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false); } try (LinkedListIterator<MessageReference> iterator = queue.iterator()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 7cd1bf1..c211260 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTTException; @@ -53,7 +54,6 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.vertx.java.core.impl.ConcurrentHashSet; /** * QT @@ -1711,7 +1711,7 @@ public class MQTTTest extends MQTTTestSupport { connection2.connect(); connection2.subscribe(mqttTopic); - Message message = connection2.receive(); + Message message = connection2.receive(5000, TimeUnit.MILLISECONDS); assertEquals(payload, new String(message.getPayload())); }
