Fixing tests (mqtt)

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fad90f5a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fad90f5a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fad90f5a

Branch: refs/heads/artemis-1009
Commit: fad90f5ac9ddb7e3cd45bb1130085fc6d44585f4
Parents: c989c68
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Mar 1 15:28:33 2017 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/message/impl/CoreMessage.java   |  2 ++
 .../artemis/core/protocol/mqtt/MQTTPublishManager.java    | 10 +++++++---
 .../artemis/core/protocol/mqtt/MQTTSessionCallback.java   |  2 +-
 .../activemq/artemis/core/protocol/mqtt/MQTTUtil.java     |  4 +---
 4 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 973c1de..b1bad5a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -131,6 +131,7 @@ public class CoreMessage extends RefCountMessage {
 
    @Override
    public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      checkEncode();
       internalWritableBuffer();
       return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, 
endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - 
BUFFER_HEADER_SPACE).asReadOnly());
    }
@@ -243,6 +244,7 @@ public class CoreMessage extends RefCountMessage {
 
    @Override
    public Message copy() {
+      checkEncode();
       return new CoreMessage(this);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e619eb9..67ef258 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -32,12 +32,15 @@ import 
org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
 
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
  */
 public class MQTTPublishManager {
 
+   private static final Logger logger = 
Logger.getLogger(MQTTPublishManager.class);
+
    private static final String MANAGEMENT_QUEUE_PREFIX = 
"$sys.mqtt.queue.qos2.";
 
    private SimpleString managementAddress;
@@ -173,6 +176,7 @@ public class MQTTPublishManager {
                }
                tx.commit();
             } catch (Throwable t) {
+               logger.warn(t.getMessage(), t);
                tx.rollback();
                throw t;
             }
@@ -253,17 +257,17 @@ public class MQTTPublishManager {
       switch (message.getType()) {
          case Message.TEXT_TYPE:
             try {
-               SimpleString text = 
message.getBodyBuffer().readNullableSimpleString();
+               SimpleString text = 
message.getReadOnlyBodyBuffer().readNullableSimpleString();
                byte[] stringPayload = text.toString().getBytes("UTF-8");
                payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                payload.writeBytes(stringPayload);
                break;
             } catch (UnsupportedEncodingException e) {
-               log.warn("Unable to send message: " + message.getMessageID() + 
" Cause: " + e.getMessage());
+               log.warn("Unable to send message: " + message.getMessageID() + 
" Cause: " + e.getMessage(), e);
             }
          default:
             ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
-            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - 
bufferDup.readerIndex()).byteBuf();
+            payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, 
deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index b997d80..a5b908f 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -51,7 +51,7 @@ public class MQTTSessionCallback implements SessionCallback {
       try {
          session.getMqttPublishManager().sendMessage((CoreMessage)message, 
consumer, deliveryCount);
       } catch (Exception e) {
-         log.warn("Unable to send message: " + message.getMessageID() + " 
Cause: " + e.getMessage());
+         log.warn("Unable to send message: " + message.getMessageID() + " 
Cause: " + e.getMessage(), e);
       }
       return 1;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fad90f5a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 6891497..e7b8c50 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -26,7 +26,6 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
@@ -114,8 +113,7 @@ public class MQTTUtil {
       String coreAddress = convertMQTTAddressFilterToCore(topic, 
session.getWildcardConfiguration());
       Message message = createServerMessage(session, new 
SimpleString(coreAddress), retain, qos);
 
-      // FIXME does this involve a copy?
-      message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), 
payload.readableBytes());
+      message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
    }
 

Reply via email to