Repository: activemq-artemis Updated Branches: refs/heads/master 6974b115f -> d01874ae8
ARTEMIS-981 OpenWire can't receive empty ObjectMessage When sending an empty ObjectMessage, broker doesn't write a 'length' field to the message buffer. In delivery the broker tries to read the length from the buffer, which causes "IndexOutOfBoundsException". To fix it, we need to check if the buffer is empty or not, and only read it if the buffer is not empty. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2fabd059 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2fabd059 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2fabd059 Branch: refs/heads/master Commit: 2fabd059d88322404067527c90e7ca7f802349a6 Parents: 6974b11 Author: Howard Gao <[email protected]> Authored: Sun Feb 19 15:02:46 2017 +0800 Committer: Howard Gao <[email protected]> Committed: Wed Feb 22 12:59:52 2017 +0800 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 20 ++++---- .../openwire/SimpleOpenWireTest.java | 50 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2fabd059/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 076b01f..9b27b81 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -518,16 +518,18 @@ public class OpenWireMessageConverter implements MessageConverter { } } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + } + bytes = bytesOut.toByteArray(); } - bytes = bytesOut.toByteArray(); } } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2fabd059/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 509bb6d..6eb45a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.openwire; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -26,11 +27,13 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; @@ -162,6 +165,53 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } @Test + public void testSendEmptyMessages() throws Exception { + Queue dest = new ActiveMQQueue(queueName); + + QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSender defaultSender = defaultQueueSession.createSender(dest); + defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + connection.start(); + + Message msg = defaultQueueSession.createMessage(); + msg.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(msg); + + QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //bytes + BytesMessage bytesMessage = defaultQueueSession.createBytesMessage(); + bytesMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(bytesMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //map + MapMessage mapMessage = defaultQueueSession.createMapMessage(); + mapMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(mapMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //object + ObjectMessage objMessage = defaultQueueSession.createObjectMessage(); + objMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(objMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //stream + StreamMessage streamMessage = defaultQueueSession.createStreamMessage(); + streamMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(streamMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //text + TextMessage textMessage = defaultQueueSession.createTextMessage(); + textMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(textMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + } + + @Test public void testXASimple() throws Exception { XAConnection connection = xaFactory.createXAConnection();
