Fixing tests (mqtt)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fad90f5a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fad90f5a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fad90f5a Branch: refs/heads/artemis-1009 Commit: fad90f5ac9ddb7e3cd45bb1130085fc6d44585f4 Parents: c989c68 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Mar 1 15:28:33 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 2 20:04:30 2017 -0500 ---------------------------------------------------------------------- .../activemq/artemis/core/message/impl/CoreMessage.java | 2 ++ .../artemis/core/protocol/mqtt/MQTTPublishManager.java | 10 +++++++--- .../artemis/core/protocol/mqtt/MQTTSessionCallback.java | 2 +- .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java | 4 +--- 4 files changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 973c1de..b1bad5a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -131,6 +131,7 @@ public class CoreMessage extends RefCountMessage { @Override public ActiveMQBuffer getReadOnlyBodyBuffer() { + checkEncode(); internalWritableBuffer(); return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly()); } @@ -243,6 +244,7 @@ public class CoreMessage extends RefCountMessage { @Override public Message copy() { + checkEncode(); return new CoreMessage(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index e619eb9..67ef258 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -32,12 +32,15 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.jboss.logging.Logger; /** * Handles MQTT Exactly Once (QoS level 2) Protocol. */ public class MQTTPublishManager { + private static final Logger logger = Logger.getLogger(MQTTPublishManager.class); + private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; private SimpleString managementAddress; @@ -173,6 +176,7 @@ public class MQTTPublishManager { } tx.commit(); } catch (Throwable t) { + logger.warn(t.getMessage(), t); tx.rollback(); throw t; } @@ -253,17 +257,17 @@ public class MQTTPublishManager { switch (message.getType()) { case Message.TEXT_TYPE: try { - SimpleString text = message.getBodyBuffer().readNullableSimpleString(); + SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); byte[] stringPayload = text.toString().getBytes("UTF-8"); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); break; } catch (UnsupportedEncodingException e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); - payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); + payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index b997d80..a5b908f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback { try { session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount); } catch (Exception e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } return 1; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 6891497..e7b8c50 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -114,8 +113,7 @@ public class MQTTUtil { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); - // FIXME does this involve a copy? - message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes()); + message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; }