Repository: qpid-broker-j Updated Branches: refs/heads/master aefd3e9a2 -> f9b9bf836
QPID-7602: [Java Broker] Add remaining tests for reply-to and add conversions for 'to' and corresponding tests Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f9b9bf83 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f9b9bf83 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f9b9bf83 Branch: refs/heads/master Commit: f9b9bf8366be03e1838662dfb58da4bf42b90139 Parents: aefd3e9 Author: Lorenz Quack <lqu...@apache.org> Authored: Wed Aug 30 16:05:16 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Wed Aug 30 16:05:16 2017 +0100 ---------------------------------------------------------------------- .../message/internal/InternalMessage.java | 4 +- .../MessageConverter_Internal_to_v0_10.java | 45 ++++- .../MessageConverter_v0_10_to_Internal.java | 3 +- .../PropertyConverter_0_10_to_InternalTest.java | 16 +- ...PropertyConverter_Internal_to_v0_10Test.java | 180 ++++++++++++++++- .../v0_8/MessageConverter_Internal_to_v0_8.java | 59 +++++- .../PropertyConverter_Internal_to_v0_8Test.java | 193 ++++++++++++++++++- .../v1_0/MessageConverter_Internal_to_v1_0.java | 1 + .../PropertyConverter_Internal_to_v1_0Test.java | 30 ++- .../MessageConverter_1_0_to_v0_10.java | 32 ++- .../PropertyConverter_1_0_to_0_10Test.java | 169 ++++++++++++++-- .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 39 ++-- .../PropertyConverter_1_0_to_0_8Test.java | 159 +++++++++++++-- 13 files changed, 836 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 3382109..29ffba6 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -219,7 +219,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, String destinationName = serverMessage.getTo(); InternalMessageHeader convertedHeader = new InternalMessageHeader(header, serverMessage.getArrivalTime()); StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody); - return new InternalMessage(handle, convertedHeader, messageBody, destinationName); + InternalMessage internalMessage = new InternalMessage(handle, convertedHeader, messageBody, destinationName); + internalMessage.setInitialRoutingAddress(serverMessage.getInitialRoutingAddress()); + return internalMessage; } private static StoredMessage<InternalMessageMetaData> createReadOnlyHandle(final long messageNumber, http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 4c74784..4ffbd3f 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -186,7 +186,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } InternalMessageHeader messageHeader = serverMsg.getMessageHeader(); deliveryProps.setPriority(MessageDeliveryPriority.get(messageHeader.getPriority())); - deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); + convertToAndInitialRoutingKey(serverMsg, deliveryProps, addressSpace); deliveryProps.setTimestamp(messageHeader.getTimestamp()); messageProps.setContentEncoding(ensureStr8("content-encoding", messageHeader.getEncoding())); @@ -237,6 +237,49 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); } + private void convertToAndInitialRoutingKey(final InternalMessage serverMsg, + final DeliveryProperties deliveryProps, + final NamedAddressSpace addressSpace) + { + String to = serverMsg.getTo(); + + final String exchangeName; + final String routingKey; + if (to == null || "".equals(to)) + { + to = serverMsg.getInitialRoutingAddress(); + } + + if (to != null && !"".equals(to)) + { + DestinationAddress destinationAddress = new DestinationAddress(addressSpace, to); + MessageDestination messageDestination = destinationAddress.getMessageDestination(); + if (messageDestination instanceof Queue) + { + exchangeName = ""; + routingKey = messageDestination.getName(); + } + else if (messageDestination instanceof Exchange) + { + exchangeName = messageDestination.getName(); + routingKey = destinationAddress.getRoutingKey(); + } + else + { + exchangeName = ""; + routingKey = to; + } + } + else + { + exchangeName = ""; + routingKey = ""; + } + + deliveryProps.setRoutingKey(ensureStr8("to' or 'initialRoutingAddress", routingKey)); + deliveryProps.setExchange(ensureStr8("to' or 'initialRoutingAddress", exchangeName)); + } + private ReplyTo getReplyTo(final NamedAddressSpace addressSpace, final String origReplyTo) { DestinationAddress destinationAddress = new DestinationAddress(addressSpace, origReplyTo); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java index 6868b14..1ea0c12 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -98,7 +98,8 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess private AMQMessageHeader convertHeader(final MessageTransferMessage serverMessage, final NamedAddressSpace addressSpace, - final Object convertedBodyObject, final String encoding) + final Object convertedBodyObject, + final String encoding) { final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject); final AMQMessageHeader messageHeader = serverMessage.getMessageHeader(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java index 91d6e38..a7b9a19 100644 --- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java +++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java @@ -339,7 +339,21 @@ public class PropertyConverter_0_10_to_InternalTest extends QpidTestCase final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); - assertEquals("Unexpected exchange", testExchange, convertedMessage.getTo()); + assertEquals("Unexpected to", testExchange, convertedMessage.getTo()); + } + + public void testInitialRoutingAddressConversion() + { + final String testExchange = "testExchange"; + final DeliveryProperties deliveryProperties = new DeliveryProperties(); + deliveryProperties.setExchange(testExchange); + final String testRoutingKey = "testRoutingKey"; + deliveryProperties.setRoutingKey(testRoutingKey); + MessageTransferMessage message = createTestMessage(deliveryProperties); + + final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + assertEquals("Unexpected initial routing address", testRoutingKey, convertedMessage.getInitialRoutingAddress()); } public void testApplicationIdConversion() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java index 69962fc..50723df 100644 --- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java +++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_Internal_to_v0_10Test.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode; import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; import org.apache.qpid.server.protocol.v0_8.AMQShortString; @@ -431,6 +432,166 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey()); } + + public void testToConversionWhenExchangeAndRoutingKeyIsSpecified() throws IOException + { + final String testExchange = "testExchange"; + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + final Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_addressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testExchange, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testRoutingKey, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenExchangeIsSpecified() throws IOException + { + final String testExchange = "testExchange"; + + InternalMessage message = createTestMessage(testExchange); + + final Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_addressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testExchange, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", "", deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenQueueIsSpecified() throws IOException + { + final String testQueue = "testQueue"; + + InternalMessage message = createTestMessage(testQueue); + + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(testQueue); + when(_addressSpace.getAttainedMessageDestination(testQueue)).thenReturn(queue); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testQueue, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenGlobalAddressIsUnknown() throws IOException + { + final String queueName = "testQueue"; + final String prefix = "/testPrefix"; + final String globalAddress = prefix + "/" + queueName; + + InternalMessage message = createTestMessage(globalAddress); + + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_addressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", globalAddress, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenGlobalAddressIsKnown() throws IOException + { + final String queueName = "testQueue"; + final String prefix = "/testPrefix"; + final String globalAddress = prefix + "/" + queueName; + + InternalMessage message = createTestMessage(globalAddress); + + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_addressSpace.getLocalAddress(globalAddress)).thenReturn(queueName); + when(_addressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", queueName, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenExchangeLengthExceeds255() throws IOException + { + final String testExchange = generateLongString(); + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + try + { + _messageConverter.convert(message, _addressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenRoutingKeyLengthExceeds255() throws Exception + { + final String testExchange = "testExchange"; + final String testRoutingKey = generateLongString(); + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + try + { + _messageConverter.convert(message, _addressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenDestinationIsSpecifiedButDoesNotExists() throws IOException + { + final String testDestination = "testDestination"; + + InternalMessage message = createTestMessage(testDestination); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testDestination, deliveryProperties.getRoutingKey()); + } + + private InternalMessage createTestMessage(String to) throws IOException + { + final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(mock(AMQMessageHeader.class)); + final StoredMessage<InternalMessageMetaData> handle = + createInternalStoredMessage(null,false, internalMessageHeader); + return new InternalMessage(handle, internalMessageHeader, null, to); + } + private InternalMessage createTestMessage(final AMQMessageHeader header) throws IOException { return createTestMessage(header, null, false, System.currentTimeMillis()); @@ -442,22 +603,29 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase return createTestMessage(header, null, false, arrivalTime); } - private InternalMessage createTestMessage(final AMQMessageHeader header, byte[] content, final boolean persistent, final long arrivalTime) throws IOException { final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(header, arrivalTime); - int contentSize = 0; + final StoredMessage<InternalMessageMetaData> storedMessage = + createInternalStoredMessage(content, persistent, internalMessageHeader); + return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + } + + private StoredMessage<InternalMessageMetaData> createInternalStoredMessage(final byte[] content, + final boolean persistent, + final InternalMessageHeader internalMessageHeader) throws IOException + { final StoredMessage<InternalMessageMetaData> storedMessage = mock(StoredMessage.class); - if (content != null) + int contentSize = content == null ? 0 : content.length; + if (contentSize > 0) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(content); - contentSize = baos.size(); + when(storedMessage.getContent(0, contentSize)).thenReturn(Collections.singletonList(QpidByteBuffer.wrap( baos.toByteArray()))); } @@ -466,7 +634,7 @@ public class PropertyConverter_Internal_to_v0_10Test extends QpidTestCase final InternalMessageMetaData metaData = new InternalMessageMetaData(persistent, internalMessageHeader, contentSize); when(storedMessage.getMetaData()).thenReturn(metaData); - return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + return storedMessage; } private String generateLongString() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 1678a8d..0a882b7 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -175,11 +175,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter final int size) { - MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING, - false, - false, - AMQShortString.valueOf(serverMsg.getInitialRoutingAddress())); - + MessagePublishInfo publishInfo = createMessagePublishInfo(serverMsg, addressSpace); final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setAppId(serverMsg.getMessageHeader().getAppId()); @@ -217,6 +213,51 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); } + private MessagePublishInfo createMessagePublishInfo(final InternalMessage serverMsg, + final NamedAddressSpace addressSpace) + { + String to = serverMsg.getTo(); + + final String exchangeName; + final String routingKey; + if (to == null || "".equals(to)) + { + to = serverMsg.getInitialRoutingAddress(); + } + + if (to != null && !"".equals(to)) + { + DestinationAddress destinationAddress = new DestinationAddress(addressSpace, to); + MessageDestination messageDestination = destinationAddress.getMessageDestination(); + if (messageDestination instanceof Queue) + { + exchangeName = ""; + routingKey = messageDestination.getName(); + } + else if (messageDestination instanceof Exchange) + { + exchangeName = messageDestination.getName(); + routingKey = destinationAddress.getRoutingKey(); + } + else + { + exchangeName = ""; + routingKey = to; + } + } + else + { + exchangeName = ""; + routingKey = ""; + } + + return new MessagePublishInfo(convertToShortStringForProperty("to", exchangeName), + false, + false, + convertToShortStringForProperty("to' or 'subject", + routingKey)); + } + private String getReplyTo(final InternalMessage serverMsg, final NamedAddressSpace addressSpace) { String replyTo = serverMsg.getMessageHeader().getReplyTo(); @@ -230,10 +271,14 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter if (messageDestination instanceof Exchange) { Exchange<?> exchange = (Exchange<?>) messageDestination; - replyToBindingUrl = String.format("%s://%s//?routingkey='%s'", + + final String routingKeyOption = "".equals(destinationAddress.getRoutingKey()) + ? "" + : String.format("?routingkey='%s'", destinationAddress.getRoutingKey()); + replyToBindingUrl = String.format("%s://%s//%s", exchange.getType(), exchange.getName(), - destinationAddress.getRoutingKey()); + routingKeyOption); } else if (messageDestination instanceof Queue) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java index 0d8baee..9524e2a 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_Internal_to_v0_8Test.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.test.utils.QpidTestCase; @@ -313,7 +314,7 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase AMQMessage convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); assertEquals("Unexpected reply-to", - "fanout://" + replyTo + "//?routingkey=''", + "fanout://" + replyTo + "//", convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString()); } @@ -356,6 +357,177 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase convertedMessage.getContentHeaderBody().getProperties().getReplyToAsString()); } + public void testToConversionWhenExchangeAndRoutingKeyIsSpecified() + { + final String testExchange = "testExchange"; + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_addressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", testRoutingKey, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenExchangeIsSpecified() + { + final String testExchange = "testExchange"; + InternalMessage message = createTestMessage(testExchange); + + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_addressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", "", messagePublishInfo.getRoutingKey().toString()); + } + + public void testConversionWhenToIsUnsetButInitialRoutingKeyIsSet() + { + final String testExchange = "testExchange"; + final String testRoutingKey = "testRoutingKey"; + + InternalMessage message = createTestMessage(""); + final String testInitialRoutingAddress = testExchange + "/" + testRoutingKey; + message.setInitialRoutingAddress(testInitialRoutingAddress); + + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_addressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", testRoutingKey, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenQueueIsSpecified() + { + final String testQueue = "testQueue"; + InternalMessage message = createTestMessage(testQueue); + + final Queue queue = mock(Queue.class); + when(queue.getName()).thenReturn(testQueue); + when(_addressSpace.getAttainedMessageDestination(testQueue)).thenReturn(queue); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", testQueue, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenGlobalAddressIsUnknown() + { + final String globalPrefix = "/testPrefix"; + final String queueName = "testQueue"; + final String globalAddress = globalPrefix + "/" + queueName; + + InternalMessage message = createTestMessage(globalAddress); + + Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_addressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", globalAddress, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenGlobalAddressIsKnown() + { + final String globalPrefix = "/testPrefix"; + final String queueName = "testQueue"; + final String globalAddress = globalPrefix + "/" + queueName; + + InternalMessage message = createTestMessage(globalAddress); + + Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_addressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + when(_addressSpace.getLocalAddress(globalAddress)).thenReturn(queueName); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", queueName, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenExchangeLengthExceeds255() + { + final String testExchange = generateLongString(); + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + try + { + _messageConverter.convert(message, _addressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenRoutingKeyLengthExceeds255() + { + final String testExchange = "testExchange"; + final String testRoutingKey = generateLongString(); + + String to = testExchange + "/" + testRoutingKey; + + InternalMessage message = createTestMessage(to); + + try + { + _messageConverter.convert(message, _addressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenDestinationIsSpecifiedButDoesNotExists() + { + final String testDestination = "testDestination"; + + InternalMessage message = createTestMessage(testDestination); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _addressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", testDestination, messagePublishInfo.getRoutingKey().toString()); + } + private InternalMessage createTestMessage(final AMQMessageHeader header) { return createTestMessage(header, null, false); @@ -367,15 +539,32 @@ public class PropertyConverter_Internal_to_v0_8Test extends QpidTestCase { final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(header); final int contentSize = content == null ? 0 : content.length; + final StoredMessage<InternalMessageMetaData> storedMessage = + createInternalStoredMessage(persistent, internalMessageHeader, contentSize); + return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + } + + private StoredMessage<InternalMessageMetaData> createInternalStoredMessage(final boolean persistent, + final InternalMessageHeader internalMessageHeader, + final int contentSize) + { final InternalMessageMetaData metaData = new InternalMessageMetaData(persistent, internalMessageHeader, contentSize); final StoredMessage<InternalMessageMetaData> storedMessage = mock(StoredMessage.class); when(storedMessage.getMetaData()).thenReturn(metaData); when(storedMessage.getContentSize()).thenReturn(contentSize); - return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + return storedMessage; } + private InternalMessage createTestMessage(String to) + { + final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(mock(AMQMessageHeader.class)); + final StoredMessage<InternalMessageMetaData> handle = + createInternalStoredMessage(false, internalMessageHeader, 0); + return new InternalMessage(handle, internalMessageHeader, null, to); + } + private String generateLongString() { return generateLongString(AMQShortString.MAX_LENGTH + 1); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java index 847a26a..8192d36 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java @@ -110,6 +110,7 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8))); } properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo()); + properties.setTo(serverMessage.getTo()); ApplicationProperties applicationProperties = null; if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty()) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java index 455ab8f..27f275f 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/PropertyConverter_Internal_to_v1_0Test.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -228,6 +229,16 @@ public class PropertyConverter_Internal_to_v1_0Test extends QpidTestCase assertEquals("Unexpected replyTo", replyTo, convertedReplyTo); } + public void testToConversion() throws IOException + { + final String to = "amq.direct/test"; + InternalMessage originalMessage = createTestMessage(to); + + Message_1_0 convertedMessage = _messageConverter.convert(originalMessage, _addressSpace); + + assertEquals("Unexpected to", to, convertedMessage.getTo()); + } + public void testTimestampConversion() { final long timestamp = System.currentTimeMillis(); @@ -285,6 +296,14 @@ public class PropertyConverter_Internal_to_v1_0Test extends QpidTestCase } } + private InternalMessage createTestMessage(String to) throws IOException + { + final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(mock(AMQMessageHeader.class)); + final StoredMessage<InternalMessageMetaData> handle = + createInternalStoredMessage(null,false, internalMessageHeader); + return new InternalMessage(handle, internalMessageHeader, null, to); + } + private InternalMessage createTestMessage(final AMQMessageHeader header) { return createTestMessage(header, null, false); @@ -295,6 +314,15 @@ public class PropertyConverter_Internal_to_v1_0Test extends QpidTestCase final boolean persistent) { final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(header); + final StoredMessage<InternalMessageMetaData> storedMessage = + createInternalStoredMessage(content, persistent, internalMessageHeader); + return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + } + + private StoredMessage<InternalMessageMetaData> createInternalStoredMessage(final byte[] content, + final boolean persistent, + final InternalMessageHeader internalMessageHeader) + { final int contentSize = content == null ? 0 : content.length; final InternalMessageMetaData metaData = new InternalMessageMetaData(persistent, internalMessageHeader, contentSize); @@ -302,6 +330,6 @@ public class PropertyConverter_Internal_to_v1_0Test extends QpidTestCase when(storedMessage.getMetaData()).thenReturn(metaData); when(storedMessage.getContentSize()).thenReturn(contentSize); - return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(storedMessage)); + return storedMessage; } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index f80a913..3a36b58 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -315,33 +315,24 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 final String exchangeName; final String routingKey; - if (to != null) + if (to != null && !"".equals(to)) { - if (to.startsWith("/")) + DestinationAddress destinationAddress = new DestinationAddress(addressSpace, to); + MessageDestination messageDestination = destinationAddress.getMessageDestination(); + if (messageDestination instanceof Queue) { - //TODO: get local address from global - throw new MessageConversionException("Could not convert message from 1.0 to 0-10 because conversion of 'to' failed. Global addresses cannot be converted."); + exchangeName = ""; + routingKey = messageDestination.getName(); } - - int separatorPosition = to.indexOf('/'); - if (separatorPosition != -1) + else if (messageDestination instanceof Exchange) { - exchangeName = to.substring(0, separatorPosition); - routingKey = to.substring(separatorPosition + 1); + exchangeName = messageDestination.getName(); + routingKey = "".equals(destinationAddress.getRoutingKey()) ? subject : destinationAddress.getRoutingKey(); } else { - MessageDestination destination = addressSpace.getAttainedMessageDestination(to); - if (destination instanceof Queue) - { - exchangeName = ""; - routingKey = to; - } - else - { - exchangeName = to; - routingKey = subject; - } + exchangeName = ""; + routingKey = to; } } else @@ -349,6 +340,7 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 exchangeName = ""; routingKey = subject; } + deliveryProps.setRoutingKey(ensureStr8("to' or 'subject", routingKey)); deliveryProps.setExchange(ensureStr8("to", exchangeName)); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java index ab60a13..9e664ba 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java @@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -36,6 +37,9 @@ import java.util.UUID; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; @@ -44,6 +48,7 @@ import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode; import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; +import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.protocol.v1_0.type.Binary; @@ -360,6 +365,104 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase } } + public void testReplyToConversionWhenQueueIsSpecified() throws IOException + { + final String replyTo = "myTestQueue"; + final Queue queue = mock(Queue.class); + when(queue.getName()).thenReturn(replyTo); + when(_namedAddressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final ReplyTo convertedReplyTo = + convertedMessage.getHeader().getMessageProperties().getReplyTo(); + assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange()); + assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey()); + } + + public void testReplyToConversionWhenExchangeIsSpecified() throws IOException + { + final String replyTo = "myTestExchange"; + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(replyTo); + when(_namedAddressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final ReplyTo convertedReplyTo = + convertedMessage.getHeader().getMessageProperties().getReplyTo(); + assertEquals("Unexpected exchange", replyTo, convertedReplyTo.getExchange()); + assertEquals("Unexpected routing key", "", convertedReplyTo.getRoutingKey()); + } + + public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified() throws IOException + { + final String exchangeName = "testExchnageName"; + final String routingKey = "testRoutingKey"; + final String replyTo = String.format("%s/%s", exchangeName, routingKey); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(exchangeName); + when(_namedAddressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final ReplyTo convertedReplyTo = + convertedMessage.getHeader().getMessageProperties().getReplyTo(); + assertEquals("Unexpected exchange", exchangeName, convertedReplyTo.getExchange()); + assertEquals("Unexpected routing key", routingKey, convertedReplyTo.getRoutingKey()); + } + + public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecifiedAndGlobalPrefixIsUsed() throws IOException + { + final String exchangeName = "testExchnageName"; + final String routingKey = "testRoutingKey"; + final String globalPrefix = "/testPrefix"; + final String replyTo = String.format("%s/%s/%s", globalPrefix, exchangeName, routingKey); + when(_namedAddressSpace.getLocalAddress(replyTo)).thenReturn(exchangeName + "/" + routingKey); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(exchangeName); + when(_namedAddressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final ReplyTo convertedReplyTo = + convertedMessage.getHeader().getMessageProperties().getReplyTo(); + assertEquals("Unexpected exchange", exchangeName, convertedReplyTo.getExchange()); + assertEquals("Unexpected routing key", routingKey, convertedReplyTo.getRoutingKey()); + } + + public void testReplyToConversionWhenReplyToCannotBeResolved() throws IOException + { + final String replyTo = "direct://amq.direct//test?routingkey='test'"; + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final ReplyTo convertedReplyTo = + convertedMessage.getHeader().getMessageProperties().getReplyTo(); + assertEquals("Unexpected exchange", "", convertedReplyTo.getExchange()); + assertEquals("Unexpected routing key", replyTo, convertedReplyTo.getRoutingKey()); + } + public void testTTLConversion() { long ttl = 10000; @@ -697,6 +800,10 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase properties.setTo(to); Message_1_0 message = createTestMessage(properties); + final Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); final DeliveryProperties deliveryProperties = @@ -712,7 +819,9 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase properties.setTo(testExchange); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + final Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -731,7 +840,9 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase properties.setSubject(testRoutingKey); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + final Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -748,7 +859,9 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase properties.setTo(testQueue); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(mock(Queue.class)); + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(testQueue); + when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(queue); final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -758,22 +871,46 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase assertEquals("Unexpected routing key", testQueue, deliveryProperties.getRoutingKey()); } - public void testToConversionWhenGlobalAddress() + public void testToConversionWhenGlobalAddressIsUnknown() { - final String globalAddress = "/testQueue"; + final String queueName = "testQueue"; + final String prefix = "/testPrefix"; + final String globalAddress = prefix + "/" + queueName; Properties properties = new Properties(); properties.setTo(globalAddress); Message_1_0 message = createTestMessage(properties); - try - { - _messageConverter.convert(message, _namedAddressSpace); - fail("Exception is not thrown"); - } - catch (MessageConversionException e) - { - // pass - } + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_namedAddressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", globalAddress, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenGlobalAddressIsKnown() + { + final String queueName = "testQueue"; + final String prefix = "/testPrefix"; + final String globalAddress = prefix + "/" + queueName; + Properties properties = new Properties(); + properties.setTo(globalAddress); + Message_1_0 message = createTestMessage(properties); + + final Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_namedAddressSpace.getLocalAddress(globalAddress)).thenReturn(queueName); + when(_namedAddressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", queueName, deliveryProperties.getRoutingKey()); } public void testToConversionWhenExchangeLengthExceeds255() @@ -829,8 +966,8 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase final DeliveryProperties deliveryProperties = convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); - assertEquals("Unexpected exchange", testDestination, deliveryProperties.getExchange()); - assertEquals("Unexpected routing key", "", deliveryProperties.getRoutingKey()); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testDestination, deliveryProperties.getRoutingKey()); } public void testContentToContentLengthConversion() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index b309a56..dcdd430 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -258,34 +258,24 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ final String exchangeName; final String routingKey; - - if (to != null) + if (to != null && !"".equals(to)) { - if (to.startsWith("/")) + DestinationAddress destinationAddress = new DestinationAddress(addressSpace, to); + MessageDestination messageDestination = destinationAddress.getMessageDestination(); + if (messageDestination instanceof Queue) { - //TODO: get local address from global - throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'to' failed. Global addresses cannot be converted."); + exchangeName = ""; + routingKey = messageDestination.getName(); } - - int separatorPosition = to.indexOf('/'); - if (separatorPosition != -1) + else if (messageDestination instanceof Exchange) { - exchangeName = to.substring(0, separatorPosition); - routingKey = to.substring(separatorPosition + 1); + exchangeName = messageDestination.getName(); + routingKey = "".equals(destinationAddress.getRoutingKey()) ? subject : destinationAddress.getRoutingKey(); } else { - MessageDestination destination = addressSpace.getAttainedMessageDestination(to); - if (destination instanceof Queue) - { - exchangeName = ""; - routingKey = to; - } - else - { - exchangeName = to; - routingKey = subject; - } + exchangeName = ""; + routingKey = to; } } else @@ -357,10 +347,13 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ if (messageDestination instanceof Exchange) { Exchange<?> exchange = (Exchange<?>) messageDestination; - replyToBindingUrl = String.format("%s://%s//?routingkey='%s'", + final String routingKeyOption = "".equals(destinationAddress.getRoutingKey()) + ? "" + : String.format("?routingkey='%s'", destinationAddress.getRoutingKey()); + replyToBindingUrl = String.format("%s://%s//%s", exchange.getType(), exchange.getName(), - destinationAddress.getRoutingKey()); + routingKeyOption); } else if (messageDestination instanceof Queue) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f9b9bf83/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java index 0bb47a7..059e268 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java @@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -32,10 +33,13 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; @@ -337,6 +341,102 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase } } + public void testReplyToConversionWhenQueueIsSpecified() throws IOException + { + final String replyTo = "myTestQueue"; + final Queue queue = mock(Queue.class); + when(queue.getName()).thenReturn(replyTo); + when(_namedAddressSpace.getAttainedMessageDestination(replyTo)).thenReturn(queue); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); + + assertEquals("Unexpected reply-to", "direct:////" + replyTo, convertedProperties.getReplyToAsString()); + } + + public void testReplyToConversionWhenExchangeIsSpecified() throws IOException + { + final String replyTo = "myTestExchange"; + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(replyTo); + when(exchange.getType()).thenReturn(ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + when(_namedAddressSpace.getAttainedMessageDestination(replyTo)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); + + assertEquals("Unexpected reply-to", "fanout://" + replyTo + "//", convertedProperties.getReplyToAsString()); + } + + public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecified() throws IOException + { + final String exchangeName = "testExchnageName"; + final String routingKey = "testRoutingKey"; + final String replyTo = String.format("%s/%s", exchangeName, routingKey); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(exchangeName); + when(exchange.getType()).thenReturn(ExchangeDefaults.TOPIC_EXCHANGE_CLASS); + when(_namedAddressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); + + assertEquals("Unexpected reply-to", "topic://" + exchangeName + "//?routingkey='" + routingKey + "'", convertedProperties.getReplyToAsString()); + } + + public void testReplyToConversionWhenExchangeAndRoutingKeyAreSpecifiedAndGlobalPrefixIsUsed() throws IOException + { + final String exchangeName = "testExchnageName"; + final String routingKey = "testRoutingKey"; + final String globalPrefix = "/testPrefix"; + final String replyTo = String.format("%s/%s/%s", globalPrefix, exchangeName, routingKey); + when(_namedAddressSpace.getLocalAddress(replyTo)).thenReturn(exchangeName + "/" + routingKey); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(exchangeName); + when(exchange.getType()).thenReturn(ExchangeDefaults.TOPIC_EXCHANGE_CLASS); + when(_namedAddressSpace.getAttainedMessageDestination(exchangeName)).thenReturn(exchange); + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); + + assertEquals("Unexpected reply-to", "topic://" + exchangeName + "//?routingkey='" + routingKey + "'", convertedProperties.getReplyToAsString()); + } + + public void testReplyToConversionWhenReplyToCannotBeResolved() throws IOException + { + final String replyTo = "direct://amq.direct//test?routingkey='test'"; + + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties(); + + assertEquals("Unexpected reply-to", "direct:////?routingkey='" + replyTo + "'", convertedProperties.getReplyToAsString()); + } + public void testTTLConversion() { long ttl = 10000; @@ -611,6 +711,10 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase properties.setTo(to); Message_1_0 message = createTestMessage(properties); + Exchange<?> exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); @@ -626,7 +730,9 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase properties.setTo(testExchange); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -645,7 +751,9 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase properties.setSubject(testRoutingKey); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + final Exchange exchange = mock(Exchange.class); + when(exchange.getName()).thenReturn(testExchange); + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(exchange); final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -662,7 +770,9 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase properties.setTo(testQueue); Message_1_0 message = createTestMessage(properties); - when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(mock(Queue.class)); + final Queue queue = mock(Queue.class); + when(queue.getName()).thenReturn(testQueue); + when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(queue); final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); @@ -672,22 +782,41 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase assertEquals("Unexpected routing key", testQueue, messagePublishInfo.getRoutingKey().toString()); } - public void testToConversionWhenGlobalAddress() + public void testToConversionWhenGlobalAddressUnrecognized() { final String globalAddress = "/testQueue"; Properties properties = new Properties(); properties.setTo(globalAddress); Message_1_0 message = createTestMessage(properties); - try - { - _messageConverter.convert(message, _namedAddressSpace); - fail("Exception is not thrown"); - } - catch (MessageConversionException e) - { - // pass - } + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", globalAddress, messagePublishInfo.getRoutingKey().toString()); + } + + public void testToConversionWhenGlobalAddressIsKnown() + { + final String globalPrefix = "/testPrefix"; + final String queueName = "testQueue"; + final String globalAddress = globalPrefix + "/" + queueName; + Properties properties = new Properties(); + properties.setTo(globalAddress); + Message_1_0 message = createTestMessage(properties); + + Queue<?> queue = mock(Queue.class); + when(queue.getName()).thenReturn(queueName); + when(_namedAddressSpace.getAttainedMessageDestination(queueName)).thenReturn(queue); + when(_namedAddressSpace.getLocalAddress(globalAddress)).thenReturn(queueName); + + final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); + + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", queueName, messagePublishInfo.getRoutingKey().toString()); } public void testToConversionWhenExchangeLengthExceeds255() @@ -743,8 +872,8 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo(); - assertEquals("Unexpected exchange", testDestination, messagePublishInfo.getExchange().toString()); - assertEquals("Unexpected routing key", "", messagePublishInfo.getRoutingKey().toString()); + assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString()); + assertEquals("Unexpected routing key", testDestination, messagePublishInfo.getRoutingKey().toString()); } private Message_1_0 createTestMessage(final Header header) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org