Repository: qpid-broker-j Updated Branches: refs/heads/master 1a1aed8af -> d79e5b6ac
QPID-7635: [Java Broker] Refactor resolution of routing address for AMQP 1.0 Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d79e5b6a Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d79e5b6a Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d79e5b6a Branch: refs/heads/master Commit: d79e5b6ac5d771dee5ea63e0fe7cb3eac8123112 Parents: 1a1aed8 Author: Lorenz Quack <lqu...@apache.org> Authored: Fri Jun 16 10:35:23 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Fri Jun 16 10:39:13 2017 +0100 ---------------------------------------------------------------------- .../qpid/server/message/ServerMessage.java | 2 - .../message/internal/InternalMessage.java | 15 +--- .../message/AbstractServerMessageTest.java | 8 +- .../server/store/TestMessageMetaDataType.java | 8 +- .../qpid/server/txn/MockServerMessage.java | 6 -- .../protocol/v0_10/MessageTransferMessage.java | 14 +--- .../qpid/server/protocol/v0_8/AMQMessage.java | 13 +--- .../v1_0/AnonymousRelayDestination.java | 2 +- .../protocol/v1_0/ExchangeDestination.java | 26 ++++++- .../qpid/server/protocol/v1_0/Message_1_0.java | 77 +++++--------------- .../protocol/v1_0/NodeReceivingDestination.java | 2 +- .../protocol/v1_0/ReceivingDestination.java | 11 ++- 12 files changed, 64 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 693afd6..d648b63 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -55,6 +55,4 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu Object getConnectionReference(); boolean isResourceAcceptable(TransactionLogResource resource); - - String getRoutingAddress(String destinationAddress, String initialDestinationRoutingAddress); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 5de6127..78e930d 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -48,7 +48,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, private static final String NON_AMQP_MESSAGE = "Non-AMQP Message"; private final Object _messageBody; private InternalMessageHeader _header; - private String _initialRoutingAddress; + private String _initialRoutingAddress = ""; InternalMessage(final StoredMessage<InternalMessageMetaData> handle, @@ -118,17 +118,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, return true; } - @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - String initialRoutingAddress = getInitialRoutingAddress(); - if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/")) - { - initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1); - } - return initialRoutingAddress; - } - public Object getMessageBody() { return _messageBody; @@ -292,6 +281,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, public void setInitialRoutingAddress(final String initialRoutingAddress) { - _initialRoutingAddress = initialRoutingAddress; + _initialRoutingAddress = initialRoutingAddress == null ? "" : initialRoutingAddress; } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java index c3dd9aa..1b2cb8f 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java @@ -44,7 +44,7 @@ public class AbstractServerMessageTest extends QpidTestCase @Override public String getInitialRoutingAddress() { - return null; + return ""; } @Override @@ -76,12 +76,6 @@ public class AbstractServerMessageTest extends QpidTestCase { return true; } - - @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - return null; - } } private TransactionLogResource createQueue(String name) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index cd7da55..24fa263 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -124,12 +124,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - return null; - } - - @Override public long getExpiration() { return 0; @@ -156,7 +150,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM @Override public String getInitialRoutingAddress() { - return null; + return ""; } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 4b14faf..fcb7c92 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -139,12 +139,6 @@ class MockServerMessage implements ServerMessage } @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - return null; - } - - @Override public long getArrivalTime() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index 8af13d8..07860f5 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -50,7 +50,8 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra public String getInitialRoutingAddress() { - return getMetaData().getRoutingKey(); + final String routingKey = getMetaData().getRoutingKey(); + return routingKey == null ? "" : routingKey; } public AMQMessageHeader getMessageHeader() @@ -86,17 +87,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra return true; } - @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - String initialRoutingAddress = getInitialRoutingAddress(); - if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/")) - { - initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1); - } - return initialRoutingAddress; - } - public Header getHeader() { return getMetaData().getHeader(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index d05d312..aaf22a2 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -69,7 +69,7 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return routingKey.toString(); } } - return null; + return ""; } public AMQMessageHeader getMessageHeader() @@ -93,17 +93,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return true; } - @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - String initialRoutingAddress = getInitialRoutingAddress(); - if(initialRoutingAddress != null && destinationAddress != null && initialRoutingAddress.startsWith(destinationAddress+"/")) - { - initialRoutingAddress = initialRoutingAddress.substring(destinationAddress.length() + 1); - } - return initialRoutingAddress; - } - public boolean isImmediate() { return getMessagePublishInfo().isImmediate(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java index bf114df..9d8c819 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java @@ -76,7 +76,7 @@ public class AnonymousRelayDestination implements ReceivingDestination public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken) { final ReceivingDestination destination; - final String routingAddress = message.getRoutingAddress(null, null); + final String routingAddress = message.getInitialRoutingAddress(); if (!routingAddress.startsWith("/") && routingAddress.contains("/")) { String[] parts = routingAddress.split("/", 2); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 29a1679..575dafd 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -97,7 +97,7 @@ public class ExchangeDestination extends QueueDestination public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken) { - final String routingAddress = message.getRoutingAddress(_exchange.getName(), _initialRoutingAddress); + final String routingAddress = getRoutingAddress(message); _exchange.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress)); final InstanceProperties instanceProperties = @@ -150,6 +150,30 @@ public class ExchangeDestination extends QueueDestination return _exchange; } + private String getRoutingAddress(final ServerMessage<?> message) + { + String routingAddress; + if (_initialRoutingAddress == null) + { + return ReceivingDestination.getRoutingAddress(message, _exchange.getName()); + } + else + { + String initialRoutingAddress = message.getInitialRoutingAddress(); + if (initialRoutingAddress.startsWith(_exchange.getName() + "/" + _initialRoutingAddress + "/")) + { + routingAddress = initialRoutingAddress.substring(2 + + _exchange.getName().length() + + _initialRoutingAddress.length()); + } + else + { + routingAddress = _initialRoutingAddress; + } + } + return routingAddress; + } + TerminusDurability getDurability() { return _durability; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index f308e78..c29fa09 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -72,15 +72,31 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM public String getInitialRoutingAddress() { - Object routingKey = getMessageHeader().getHeader("routing-key"); - if(routingKey != null) + MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader(); + String routingAddress = null; + final String to = messageHeader.getTo(); + if (to != null) + { + routingAddress = to; + } + else if (messageHeader.getHeader("routing-key") instanceof String) + { + routingAddress = (String) messageHeader.getHeader("routing-key"); + } + else if (messageHeader.getHeader("routing_key") instanceof String) { - return routingKey.toString(); + routingAddress = (String) messageHeader.getHeader("routing_key"); + } + else if (messageHeader.getSubject() != null) + { + routingAddress = messageHeader.getSubject(); } else { - return getMessageHeader().getTo(); + routingAddress = ""; } + + return routingAddress; } private MessageMetaData_1_0 getMessageMetaData() @@ -118,59 +134,6 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM return getMessageHeader().getNotValidBefore() == 0L || resourceSupportsDeliveryDelay(resource); } - @Override - public String getRoutingAddress(final String destinationAddress, final String initialDestinationRoutingAddress) - { - String routingAddress; - MessageMetaData_1_0.MessageHeader_1_0 messageHeader = getMessageHeader(); - if (initialDestinationRoutingAddress == null) - { - final String to = messageHeader.getTo(); - if (to != null && (destinationAddress == null || destinationAddress.trim().equals(""))) - { - routingAddress = to; - } - else if (to != null && to.startsWith(destinationAddress + "/")) - { - routingAddress = to.substring(1 + destinationAddress.length()); - } - else if (to != null && !to.equals(destinationAddress)) - { - routingAddress = to; - } - else if (messageHeader.getHeader("routing-key") instanceof String) - { - routingAddress = (String) messageHeader.getHeader("routing-key"); - } - else if (messageHeader.getHeader("routing_key") instanceof String) - { - routingAddress = (String) messageHeader.getHeader("routing_key"); - } - else if (messageHeader.getSubject() != null) - { - routingAddress = messageHeader.getSubject(); - } - else - { - routingAddress = ""; - } - } - else - { - if (messageHeader.getTo() != null - && messageHeader.getTo().startsWith(destinationAddress + "/" + initialDestinationRoutingAddress + "/")) - { - final int prefixLength = 2 + destinationAddress.length() + initialDestinationRoutingAddress.length(); - routingAddress = messageHeader.getTo().substring(prefixLength); - } - else - { - routingAddress = initialDestinationRoutingAddress; - } - } - return routingAddress; - } - private boolean resourceSupportsDeliveryDelay(final TransactionLogResource resource) { return resource instanceof Queue && ((Queue<?>)resource).isHoldOnPublishEnabled(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index 88908c5..ac598d6 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -80,7 +80,7 @@ public class NodeReceivingDestination implements ReceivingDestination public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken) { - final String routingAddress = message.getRoutingAddress(_destination.getName(), null); + final String routingAddress = ReceivingDestination.getRoutingAddress(message, _address); _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress)); final InstanceProperties instanceProperties = http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79e5b6a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java index 502cc47..debf559 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v1_0.type.Outcome; - import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.txn.ServerTransaction; @@ -44,4 +43,14 @@ public interface ReceivingDestination extends Destination String getAddress(); MessageDestination getMessageDestination(); + + static String getRoutingAddress(final ServerMessage<?> message, final String destinationName) + { + String initialRoutingAddress = message.getInitialRoutingAddress(); + if (destinationName != null && initialRoutingAddress.startsWith(destinationName + "/")) + { + initialRoutingAddress = initialRoutingAddress.substring(destinationName.length() + 1); + } + return initialRoutingAddress; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org