ARTEMIS-917 Only return body of retained message after reboot (cherry picked from commit 3900cb0ec7305ee1f341687ace6a4fab7469a817)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f231fe4e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f231fe4e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f231fe4e Branch: refs/heads/1.x Commit: f231fe4e9b4d0720c3bac19d097c1bde17badd46 Parents: d0b568c Author: Martyn Taylor <[email protected]> Authored: Sun Feb 5 19:20:32 2017 +0000 Committer: Martyn Taylor <[email protected]> Committed: Fri Feb 10 14:55:24 2017 +0000 ---------------------------------------------------------------------- .../artemis/core/message/impl/MessageImpl.java | 2 ++ .../core/protocol/mqtt/MQTTPublishManager.java | 4 ++- .../integration/mqtt/imported/MQTTTest.java | 30 ++++++++++++++++++++ .../mqtt/imported/MQTTTestSupport.java | 4 +++ 4 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index 1e19817..69e6e65 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -162,8 +162,10 @@ public abstract class MessageImpl implements MessageInternal { buffer.setIndex(other.buffer.readerIndex(), buffer.capacity()); bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); + bodyBuffer.readerIndex(BODY_OFFSET); bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex()); + endOfBodyPosition = other.endOfBodyPosition; } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 73a7c8e..ca926b9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.EmptyByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -230,7 +231,8 @@ public class MQTTPublishManager { log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); } default: - payload = message.getBodyBufferDuplicate().byteBuf(); + ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate(); + payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 1d6b98d..886edd3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -1613,4 +1614,33 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + + @Test + public void testRetainedMessagesAreCorrectlyFormedAfterRestart() throws Exception { + String clientId = "testMqtt"; + String address = "testAddress"; + String payload = "This is a test message"; + + // Send MQTT Retain Message + Topic[] mqttTopic = new Topic[]{new Topic(address, QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.publish(address, payload.getBytes(), QoS.AT_LEAST_ONCE, true); + + getServer().stop(false); + getServer().start(); + waitForServerToStart(getServer()); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttTopic); + + Message message = connection2.receive(); + assertEquals(payload, new String(message.getPayload())); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 8b85f83..476a562 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -93,6 +93,10 @@ public class MQTTTestSupport extends ActiveMQTestBase { return name.getMethodName(); } + public ActiveMQServer getServer() { + return server; + } + @Override @Before public void setUp() throws Exception {
