Repository: activemq-artemis Updated Branches: refs/heads/master 88f4a8cce -> ceb2b38c8
ARTEMIS-888 - AMQP headers arent always set https://issues.apache.org/jira/browse/ARTEMIS-888 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3b75c954 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3b75c954 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3b75c954 Branch: refs/heads/master Commit: 3b75c954135fdc58b172a381403cb7daa223b602 Parents: 88f4a8c Author: Andy Taylor <[email protected]> Authored: Wed Dec 14 16:09:36 2016 +0000 Committer: Andy Taylor <[email protected]> Committed: Thu Dec 15 07:07:04 2016 +0000 ---------------------------------------------------------------------- .../converter/message/AMQPMessageSupport.java | 4 + .../converter/message/InboundTransformer.java | 4 + .../message/JMSMappingOutboundTransformer.java | 14 ++ .../transport/amqp/client/AmqpMessage.java | 19 +++ .../integration/amqp/AmqpSendReceiveTest.java | 157 +++++++++++++++++++ 5 files changed, 198 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java index 9eab737..8c4612d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java @@ -69,12 +69,16 @@ public final class AMQPMessageSupport { public static final String CONTENT_TYPE = "ContentType"; public static final String CONTENT_ENCODING = "ContentEncoding"; public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + public static final String DURABLE = "DURABLE"; + public static final String PRIORITY = "PRIORITY"; public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; public static final String FOOTER_PREFIX = "FT_"; public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; + public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java index 5094af5..9c40cd8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java @@ -22,6 +22,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; @@ -76,12 +78,14 @@ public abstract class InboundTransformer { jms.setBooleanProperty(JMS_AMQP_HEADER, true); if (header.getDurable() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); } else { jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); } if (header.getPriority() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); jms.setJMSPriority(header.getPriority().intValue()); } else { jms.setJMSPriority(Message.DEFAULT_PRIORITY); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 2fa7145..7dbc6d4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -32,6 +32,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; @@ -287,6 +289,18 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { header = new Header(); } continue; + } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { + if (header == null) { + header = new Header(); + } + header.setDurable(message.getInnerMessage().isDurable()); + continue; + } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + continue; } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { if (properties == null) { properties = new Properties(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index f7a9364..5cf2c0a 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -402,6 +402,25 @@ public class AmqpMessage { } /** + * Sets the priority header on the outgoing message. + * + * @param priority the priority value to set. + */ + public void setPriority(short priority) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setPriority(priority); + } + + /** + * Sets the priority header on the outgoing message. + */ + public short getPriority() { + return getWrappedMessage().getPriority(); + } + + + /** * Sets a given application property on an outbound message. * * @param key the name to assign the new property. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b75c954/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index e102c77..b9d5504 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -239,6 +239,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + public void testMessageDurableFalse() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertFalse(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDurableTrue() throws Exception { + sendMessages(getTestName(), 1, true); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertTrue(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 4); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNonDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 0); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 0, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNoPriority() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; sendMessages(getTestName(), MSG_COUNT); @@ -940,4 +1060,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } } + + + public void sendMessages(String destinationName, int count, boolean durable) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setDurable(durable); + sender.send(message); + } + } finally { + connection.close(); + } + } + + public void sendMessages(String destinationName, int count, short priority) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setPriority(priority); + sender.send(message); + } + } finally { + connection.close(); + } + } }
