Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 4e0acb2d8 -> de42f5304


QPID-7865 : Tidy-up AMQP 1.0 'Destination' classes


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f6b59251
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f6b59251
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f6b59251

Branch: refs/heads/master
Commit: f6b5925186a1f40a790b398cee2cde5336633f00
Parents: ec1bd82
Author: rgodfrey <rgodf...@apache.org>
Authored: Tue Jul 18 22:02:36 2017 +0200
Committer: rgodfrey <rgodf...@apache.org>
Committed: Tue Jul 18 22:02:36 2017 +0200

----------------------------------------------------------------------
 .../v1_0/AnonymousRelayDestination.java         |  38 +-
 .../qpid/server/protocol/v1_0/Destination.java  |  30 --
 .../protocol/v1_0/ExchangeDestination.java      | 231 ----------
 .../v1_0/ExchangeSendingDestination.java        | 435 +++++++++++++++++++
 .../protocol/v1_0/MessageSourceDestination.java |  85 ----
 .../protocol/v1_0/NodeReceivingDestination.java |  10 +-
 .../server/protocol/v1_0/QueueDestination.java  | 110 -----
 .../protocol/v1_0/ReceivingDestination.java     |   3 +-
 .../protocol/v1_0/SendingDestination.java       |   5 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      |  19 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 380 ++--------------
 .../v1_0/StandardSendingDestination.java        |  79 ++++
 12 files changed, 604 insertions(+), 821 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index 9d8c819..4ae80f3 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;/*
 import static 
org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
 
 import java.util.Arrays;
-import java.util.Collections;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -83,14 +82,12 @@ public class AnonymousRelayDestination implements 
ReceivingDestination
             MessageDestination exchangeDestination = 
_addressSpace.getAttainedMessageDestination(parts[0]);
             if (exchangeDestination instanceof Exchange)
             {
-                Symbol[] capabilities = _target.getCapabilities();
-                destination = new ExchangeDestination(((Exchange<?>) 
exchangeDestination),
-                                                      null,
-                                                      _target.getDurable(),
-                                                      
_target.getExpiryPolicy(),
-                                                      parts[0],
-                                                      parts[1],
-                                                      capabilities != null ? 
Arrays.asList(capabilities) : Collections.emptyList());
+                destination = new NodeReceivingDestination(exchangeDestination,
+                                                           
_target.getDurable(),
+                                                           
_target.getExpiryPolicy(),
+                                                           parts[0],
+                                                           
_target.getCapabilities(),
+                                                           _eventLogger);
             }
             else
             {
@@ -100,6 +97,19 @@ public class AnonymousRelayDestination implements 
ReceivingDestination
         else
         {
             MessageDestination messageDestination = 
_addressSpace.getAttainedMessageDestination(routingAddress);
+            if(messageDestination == null)
+            {
+                // TODO - should we do this... if the queue is not being 
advertised as a destination, shouldn't we
+                //        respect that?
+
+                // Covers the unlikely case where there is no attained 
destination with the given address, but there is
+                // a queue with that address
+                MessageSource source = 
_addressSpace.getAttainedMessageSource(routingAddress);
+                if (source instanceof Queue)
+                {
+                    messageDestination = (Queue<?>) source;
+                }
+            }
             if (messageDestination != null)
             {
                 destination = new NodeReceivingDestination(messageDestination,
@@ -111,15 +121,7 @@ public class AnonymousRelayDestination implements 
ReceivingDestination
             }
             else
             {
-                MessageSource source = 
_addressSpace.getAttainedMessageSource(routingAddress);
-                if (source instanceof Queue)
-                {
-                    destination = new QueueDestination((Queue<?>) source, 
routingAddress);
-                }
-                else
-                {
-                    destination = null;
-                }
+                destination = null;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
deleted file mode 100644
index 1eadc23..0000000
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-
-public interface Destination
-{
-    Symbol[] getCapabilities();
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
deleted file mode 100644
index edffe0d..0000000
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import static 
org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.RejectType;
-import org.apache.qpid.server.message.RoutingResult;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
-import 
org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.security.SecurityToken;
-import org.apache.qpid.server.txn.ServerTransaction;
-
-public class ExchangeDestination extends QueueDestination
-{
-    private static final Accepted ACCEPTED = new Accepted();
-    private static final Rejected REJECTED = new Rejected();
-    private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
-    public static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
-    public static final Symbol SHARED_CAPABILITY = Symbol.getSymbol("shared");
-    public static final Symbol GLOBAL_CAPABILITY = Symbol.getSymbol("global");
-
-    private final Exchange<?> _exchange;
-    private final TerminusDurability _durability;
-    private final TerminusExpiryPolicy _expiryPolicy;
-    private final String _initialRoutingAddress;
-    private final boolean _discardUnroutable;
-    private final Symbol[] _capabilities;
-
-    public ExchangeDestination(Exchange<?> exchange,
-                               final Queue<?> queue,
-                               TerminusDurability durable,
-                               TerminusExpiryPolicy expiryPolicy,
-                               String address,
-                               final String initialRoutingAddress,
-                               final List<Symbol> capabilities)
-    {
-        super(queue, address);
-        _exchange = exchange;
-        _durability = durable;
-        _expiryPolicy = expiryPolicy;
-        _discardUnroutable = (capabilities != null && 
capabilities.contains(DISCARD_UNROUTABLE)) || 
exchange.getUnroutableMessageBehaviour() == 
Exchange.UnroutableMessageBehaviour.DISCARD;
-        _initialRoutingAddress = initialRoutingAddress;
-
-        List<Symbol> destinationCapabilities = new ArrayList<>(capabilities);
-        if (_discardUnroutable)
-        {
-            destinationCapabilities.add(DISCARD_UNROUTABLE);
-        }
-        else
-        {
-            destinationCapabilities.add(REJECT_UNROUTABLE);
-        }
-        destinationCapabilities.add(TOPIC_CAPABILITY);
-        destinationCapabilities.add(DELAYED_DELIVERY);
-
-        _capabilities = destinationCapabilities.toArray(new 
Symbol[destinationCapabilities.size()]);
-    }
-
-    @Override
-    public Outcome[] getOutcomes()
-    {
-        return OUTCOMES;
-    }
-
-    @Override
-    public Outcome send(final ServerMessage<?> message, final 
ServerTransaction txn, final SecurityToken securityToken)
-    {
-        final String routingAddress = getRoutingAddress(message);
-        _exchange.authorisePublish(securityToken, 
Collections.singletonMap("routingKey", routingAddress));
-
-        final InstanceProperties instanceProperties =
-            new InstanceProperties()
-            {
-
-                @Override
-                public Object getProperty(final Property prop)
-                {
-                    switch(prop)
-                    {
-                        case MANDATORY:
-                            return false;
-                        case REDELIVERED:
-                            return false;
-                        case PERSISTENT:
-                            return message.isPersistent();
-                        case IMMEDIATE:
-                            return false;
-                        case EXPIRATION:
-                            return message.getExpiration();
-                    }
-                    return null;
-                }};
-
-        final RoutingResult result = _exchange.route(message, routingAddress, 
instanceProperties);
-        final int enqueues = result.send(txn, null);
-        if (enqueues == 0)
-        {
-            if (!_discardUnroutable)
-            {
-                if (result.isRejected())
-                {
-                    AmqpError error;
-                    if (result.containsReject(RejectType.LIMIT_EXCEEDED))
-                    {
-                        error = AmqpError.RESOURCE_LIMIT_EXCEEDED;
-                    }
-                    else if 
(result.containsReject(RejectType.PRECONDITION_FAILED))
-                    {
-                        error = AmqpError.PRECONDITION_FAILED;
-                    }
-                    else
-                    {
-                        error = AmqpError.ILLEGAL_STATE;
-                    }
-                    return createdRejectedOutcome(error, 
result.getRejectReason());
-                }
-                else
-                {
-                    return createdRejectedOutcome(AmqpError.NOT_FOUND,
-                                                  String.format("Unknown 
destination '%s'", routingAddress));
-                }
-            }
-            else
-            {
-                
_exchange.getEventLogger().message(ExchangeMessages.DISCARDMSG(_exchange.getName(),
 routingAddress));
-            }
-        }
-        return ACCEPTED;
-    }
-
-    private Outcome createdRejectedOutcome(AmqpError errorCode, String 
errorMessage)
-    {
-        Rejected rejected = new Rejected();
-        final Error notFoundError = new Error(errorCode, errorMessage);
-        rejected.setError(notFoundError);
-        return rejected;
-    }
-
-    @Override
-    public MessageDestination getMessageDestination()
-    {
-        return _exchange;
-    }
-
-    private String getRoutingAddress(final ServerMessage<?> message)
-    {
-        String routingAddress;
-        if (_initialRoutingAddress == null)
-        {
-            return ReceivingDestination.getRoutingAddress(message, 
_exchange.getName());
-        }
-        else
-        {
-            String initialRoutingAddress = message.getInitialRoutingAddress();
-            if (initialRoutingAddress.startsWith(_exchange.getName() + "/" + 
_initialRoutingAddress + "/"))
-            {
-                routingAddress = initialRoutingAddress.substring(2
-                                                                 + 
_exchange.getName().length()
-                                                                 + 
_initialRoutingAddress.length());
-            }
-            else
-            {
-                routingAddress = _initialRoutingAddress;
-            }
-        }
-        return routingAddress;
-    }
-
-    TerminusDurability getDurability()
-    {
-        return _durability;
-    }
-
-    TerminusExpiryPolicy getExpiryPolicy()
-    {
-        return _expiryPolicy;
-    }
-
-    @Override
-    public int getCredit()
-    {
-        // TODO - fix
-        return 20000;
-    }
-
-    public Exchange<?> getExchange()
-    {
-        return _exchange;
-    }
-
-    @Override
-    public Symbol[] getCapabilities()
-    {
-        return _capabilities;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
new file mode 100644
index 0000000..601dc9f
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static 
org.apache.qpid.server.protocol.v1_0.Session_1_0.GLOBAL_CAPABILITY;
+import static 
org.apache.qpid.server.protocol.v1_0.Session_1_0.SHARED_CAPABILITY;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NotFoundException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import 
org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import 
org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+public class ExchangeSendingDestination extends StandardSendingDestination
+{
+    private static final Accepted ACCEPTED = new Accepted();
+    private static final Rejected REJECTED = new Rejected();
+    private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+    private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
+
+    private final Exchange<?> _exchange;
+    private final Symbol[] _capabilities;
+    private Map<Symbol, Filter> _filters;
+
+    ExchangeSendingDestination(Exchange<?> exchange,
+                               String linkName,
+                               String bindingKey,
+                               String remoteContainerId,
+                               Source source) throws AmqpErrorException
+    {
+        this(exchange, bindingKey, source, 
getMangledSubscriptionName(linkName, remoteContainerId, source));
+    }
+
+    private ExchangeSendingDestination(final Exchange<?> exchange,
+                                       final String bindingKey,
+                                       final Source source,
+                                       final String subscriptionName) throws 
AmqpErrorException
+    {
+        this(exchange, source, subscriptionName, createBindingInfo(exchange, 
subscriptionName, bindingKey, source));
+    }
+
+    private ExchangeSendingDestination(final Exchange<?> exchange,
+                                       final Source source,
+                                       final String subscriptionName,
+                                       final BindingInfo bindingInfo)
+            throws AmqpErrorException
+    {
+        this(exchange, getQueue(exchange, source, subscriptionName, 
bindingInfo), bindingInfo, source.getCapabilities());
+    }
+
+    private ExchangeSendingDestination(final Exchange<?> exchange,
+                                       final Queue<?> queue,
+                                       final BindingInfo bindingInfo,
+                                       final Symbol[] capabilities)
+    {
+        super(queue);
+        _exchange = exchange;
+        _filters = bindingInfo.getActualFilters().isEmpty() ? null : 
bindingInfo.getActualFilters();
+        List<Symbol> sourceCapabilities = new ArrayList<>();
+
+        if (hasCapability(capabilities, GLOBAL_CAPABILITY))
+        {
+            sourceCapabilities.add(GLOBAL_CAPABILITY);
+        }
+        if (hasCapability(capabilities, SHARED_CAPABILITY))
+        {
+            sourceCapabilities.add(SHARED_CAPABILITY);
+        }
+
+        sourceCapabilities.add(TOPIC_CAPABILITY);
+
+        _capabilities = sourceCapabilities.toArray(new 
Symbol[sourceCapabilities.size()]);
+    }
+
+    private static BindingInfo createBindingInfo(final Exchange<?> exchange,
+                                                 final String subscriptionName,
+                                                 final String bindingKey, 
final Source source)
+            throws AmqpErrorException
+    {
+        return new BindingInfo(exchange, subscriptionName,
+                               bindingKey, source.getFilter());
+    }
+
+    private static String getMangledSubscriptionName(final String linkName,
+                                                     final String 
remoteContainerId,
+                                                     final Source source)
+    {
+        boolean isDurable = source.getExpiryPolicy() == 
TerminusExpiryPolicy.NEVER;
+        boolean isShared = hasCapability(source.getCapabilities(), 
SHARED_CAPABILITY);
+        boolean isGlobal = hasCapability(source.getCapabilities(), 
GLOBAL_CAPABILITY);
+
+        return getMangledSubscriptionName(linkName, isDurable, isShared, 
isGlobal, remoteContainerId);
+
+    }
+
+
+    private static Queue<?> getQueue(Exchange<?> exchange, Source source, 
String subscriptionName, BindingInfo bindingInfo)
+            throws AmqpErrorException
+    {
+        Queue<?> queue;
+        final Map<String, Object> attributes = new HashMap<>();
+        boolean isDurable = source.getExpiryPolicy() == 
TerminusExpiryPolicy.NEVER;
+        boolean isShared = hasCapability(source.getCapabilities(), 
SHARED_CAPABILITY);
+
+        ExclusivityPolicy exclusivityPolicy;
+        if (isShared)
+        {
+            exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
+        }
+        else
+        {
+            exclusivityPolicy = ExclusivityPolicy.LINK;
+        }
+
+        org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = 
getLifetimePolicy(source.getExpiryPolicy());
+
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, subscriptionName);
+        attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+        attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+        attributes.put(Queue.DURABLE, isDurable);
+
+        Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
+        try
+        {
+            if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
+            {
+                try
+                {
+                    queue = ((QueueManagingVirtualHost) 
exchange.getAddressSpace()).getSubscriptionQueue(exchange.getName(), 
attributes, bindings);
+                }
+                catch (NotFoundException e)
+                {
+                    throw new AmqpErrorException(new 
Error(AmqpError.NOT_FOUND, e.getMessage()));
+                }
+            }
+            else
+            {
+                throw new AmqpErrorException(new 
Error(AmqpError.INTERNAL_ERROR,
+                                                       "Address space of 
unexpected type"));
+            }
+        }
+        catch(IllegalStateException e)
+        {
+            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED,
+                                                   "Subscription is already in 
use"));
+        }
+        return queue;
+    }
+
+
+
+    private static boolean hasCapability(final Symbol[] capabilities,
+                                  final Symbol expectedCapability)
+    {
+        return (capabilities != null && 
Arrays.asList(capabilities).contains(expectedCapability));
+    }
+
+    private static LifetimePolicy getLifetimePolicy(final TerminusExpiryPolicy 
expiryPolicy) throws AmqpErrorException
+    {
+        LifetimePolicy lifetimePolicy;
+        if (expiryPolicy == null || expiryPolicy == 
TerminusExpiryPolicy.SESSION_END)
+        {
+            lifetimePolicy = LifetimePolicy.DELETE_ON_SESSION_END;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
+        {
+            lifetimePolicy = LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
+        {
+            lifetimePolicy = LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
+        {
+            lifetimePolicy = LifetimePolicy.PERMANENT;
+        }
+        else
+        {
+            Error error = new Error(AmqpError.NOT_IMPLEMENTED,
+                                    String.format("unknown ExpiryPolicy '%s'", 
expiryPolicy.getValue()));
+            throw new AmqpErrorException(error);
+        }
+        return lifetimePolicy;
+    }
+
+    private static String getMangledSubscriptionName(final String linkName,
+                                                     final boolean isDurable,
+                                                     final boolean isShared,
+                                                     final boolean isGlobal,
+                                                     String remoteContainerId)
+    {
+        if (isGlobal)
+        {
+            remoteContainerId = "_global_";
+        }
+        else
+        {
+            remoteContainerId = sanitizeName(remoteContainerId);
+        }
+
+        String subscriptionName;
+        if (!isDurable && !isShared)
+        {
+            subscriptionName = UUID.randomUUID().toString();
+        }
+        else
+        {
+            subscriptionName = linkName;
+            if (isShared)
+            {
+                int separator = subscriptionName.indexOf("|");
+                if (separator > 0)
+                {
+                    subscriptionName = subscriptionName.substring(0, 
separator);
+                }
+            }
+            subscriptionName = sanitizeName(subscriptionName);
+        }
+        return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + 
"_/" + (isDurable
+                ? "durable"
+                : "nondurable");
+    }
+
+    private static String sanitizeName(String name)
+    {
+        return name.replace("_", "__")
+                   .replace(".", "_:")
+                   .replace("(", "_O")
+                   .replace(")", "_C")
+                   .replace("<", "_L")
+                   .replace(">", "_R");
+    }
+
+    @Override
+    public Outcome[] getOutcomes()
+    {
+        return OUTCOMES;
+    }
+
+    public Exchange<?> getExchange()
+    {
+        return _exchange;
+    }
+
+    Map<Symbol, Filter> getFilters()
+    {
+        return _filters == null ? null : Collections.unmodifiableMap(_filters);
+    }
+
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        return _capabilities;
+    }
+
+    public Queue<?> getQueue()
+    {
+        return (Queue<?>) getMessageSource();
+    }
+
+    private static final class BindingInfo
+    {
+        private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
+        private final Map<String, Map<String, Object>> _bindings = new 
HashMap<>();
+
+        BindingInfo(Exchange<?> exchange,
+                            final String queueName,
+                            String bindingKey,
+                            Map<Symbol, Filter> filters) throws 
AmqpErrorException
+        {
+            String binding = null;
+            final Map<String, Object> arguments = new HashMap<>();
+            if (filters != null && !filters.isEmpty())
+            {
+                boolean hasBindingFilter = false;
+                boolean hasMessageFilter = false;
+                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+                {
+                    if(!hasBindingFilter
+                       && entry.getValue() instanceof ExactSubjectFilter
+                       && 
exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+                    {
+                        ExactSubjectFilter filter = (ExactSubjectFilter) 
entry.getValue();
+                        binding = filter.getValue();
+                        _actualFilters.put(entry.getKey(), filter);
+                        hasBindingFilter = true;
+                    }
+                    else if(!hasBindingFilter
+                            && entry.getValue() instanceof 
MatchingSubjectFilter
+                            && 
exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+                    {
+                        MatchingSubjectFilter filter = (MatchingSubjectFilter) 
entry.getValue();
+                        binding = filter.getValue();
+                        _actualFilters.put(entry.getKey(), filter);
+                        hasBindingFilter = true;
+                    }
+                    else if(entry.getValue() instanceof NoLocalFilter)
+                    {
+                        _actualFilters.put(entry.getKey(), entry.getValue());
+                        arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), 
true);
+                    }
+                    else if (!hasMessageFilter
+                             && entry.getValue() instanceof 
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+                    {
+                        
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter 
selectorFilter =
+                                
(org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) 
entry.getValue();
+
+                        // TODO: QPID-7642 - due to inconsistent handling of 
invalid filters
+                        // by different exchange implementations
+                        // we need to validate filter before creation of 
binding
+                        try
+                        {
+                            new 
org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
+                        }
+                        catch (ParseException | SelectorParsingException | 
TokenMgrError e)
+                        {
+                            Error error = new Error();
+                            error.setCondition(AmqpError.INVALID_FIELD);
+                            error.setDescription("Invalid JMS Selector: " + 
selectorFilter.getValue());
+                            
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), 
Symbol.valueOf("filter")));
+                            throw new AmqpErrorException(error);
+                        }
+
+                        arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), 
selectorFilter.getValue());
+                        _actualFilters.put(entry.getKey(), selectorFilter);
+                        hasMessageFilter = true;
+                    }
+                }
+            }
+
+            if(binding != null)
+            {
+                _bindings.put(binding, arguments);
+            }
+            if(bindingKey != null)
+            {
+                _bindings.put(bindingKey, arguments);
+            }
+            if(binding == null
+               && bindingKey == null
+               && 
exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+            {
+                _bindings.put(queueName, arguments);
+            }
+            else if(binding == null
+                    && bindingKey == null
+                    && 
exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+            {
+                _bindings.put("#", arguments);
+            }
+        }
+
+        Map<Symbol, Filter> getActualFilters()
+        {
+            return _actualFilters;
+        }
+
+        Map<String, Map<String, Object>> getBindings()
+        {
+            return _bindings;
+        }
+
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final BindingInfo that = (BindingInfo) o;
+
+            return _actualFilters.equals(that._actualFilters) && 
_bindings.equals(that._bindings);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = _actualFilters.hashCode();
+            result = 31 * result + _bindings.hashCode();
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
deleted file mode 100644
index 13433b0..0000000
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import static org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-
-public class MessageSourceDestination implements SendingDestination
-{
-    private static final Accepted ACCEPTED = new Accepted();
-    private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
-
-
-    private final MessageSource _messageSource;
-    private final Symbol[] _capabilities;
-
-    public MessageSourceDestination(MessageSource messageSource)
-    {
-        _messageSource = messageSource;
-        List<Symbol> capabilities = new ArrayList<>();
-        if (_messageSource instanceof Queue)
-        {
-            LifetimePolicy queueLifetimePolicy = ((Queue<?>) 
_messageSource).getLifetimePolicy();
-            if (PERMANENT == queueLifetimePolicy)
-            {
-                capabilities.add(Symbol.getSymbol("queue"));
-            }
-            else
-            {
-                capabilities.add(Symbol.getSymbol("temporary-queue"));
-            }
-        }
-        _capabilities = capabilities.toArray(new Symbol[capabilities.size()]);
-    }
-
-    public Outcome[] getOutcomes()
-    {
-        return OUTCOMES;
-    }
-
-    public int getCredit()
-    {
-        // TODO - fix
-        return 100;
-    }
-
-    @Override
-    public MessageSource getMessageSource()
-    {
-        return _messageSource;
-    }
-
-    @Override
-    public Symbol[] getCapabilities()
-    {
-        return _capabilities;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index f254613..ae161aa 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -56,6 +56,7 @@ public class NodeReceivingDestination implements 
ReceivingDestination
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
     private final String _address;
+    private String _routingAddress;
 
     public NodeReceivingDestination(MessageDestination destination,
                                     TerminusDurability durable,
@@ -83,7 +84,9 @@ public class NodeReceivingDestination implements 
ReceivingDestination
     @Override
     public Outcome send(final ServerMessage<?> message, final 
ServerTransaction txn, final SecurityToken securityToken)
     {
-        final String routingAddress = 
ReceivingDestination.getRoutingAddress(message, _address);
+        final String routingAddress = _routingAddress == null
+                ? ReceivingDestination.getRoutingAddress(message, _address)
+                : _routingAddress;
         _destination.authorisePublish(securityToken, 
Collections.singletonMap("routingKey", routingAddress));
 
         final InstanceProperties instanceProperties =
@@ -197,4 +200,9 @@ public class NodeReceivingDestination implements 
ReceivingDestination
         capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
+
+    public void setRoutingAddress(final String routingAddress)
+    {
+        _routingAddress = routingAddress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
deleted file mode 100644
index 6c56625..0000000
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.util.Collections;
-
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.security.SecurityToken;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
-import org.apache.qpid.server.txn.ServerTransaction;
-
-public class QueueDestination extends MessageSourceDestination implements 
ReceivingDestination
-{
-    private static final Accepted ACCEPTED = new Accepted();
-    private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
-    private final String _address;
-
-    public QueueDestination(Queue<?> queue, final String address)
-    {
-        super(queue);
-        _address = address;
-    }
-
-    @Override
-    public Outcome[] getOutcomes()
-    {
-        return OUTCOMES;
-    }
-
-    @Override
-    public Outcome send(final ServerMessage<?> message, final 
ServerTransaction txn, final SecurityToken securityToken)
-    {
-        getQueue().authorisePublish(securityToken, Collections.emptyMap());
-
-        txn.enqueue(getMessageSource(), message, new 
ServerTransaction.EnqueueAction()
-        {
-            MessageReference _reference = message.newReference();
-
-
-            @Override
-            public void postCommit(MessageEnqueueRecord... records)
-            {
-                try
-                {
-                    getQueue().enqueue(message, null, records[0]);
-                }
-                finally
-                {
-                    _reference.release();
-                }
-            }
-
-            @Override
-            public void onRollback()
-            {
-                _reference.release();
-            }
-        });
-
-
-        return ACCEPTED;
-    }
-
-    @Override
-    public int getCredit()
-    {
-        // TODO - fix
-        return 100;
-    }
-
-    public Queue<?> getQueue()
-    {
-        return (Queue<?>) getMessageSource();
-    }
-
-    @Override
-    public String getAddress()
-    {
-        return _address;
-    }
-
-    @Override
-    public MessageDestination getMessageDestination()
-    {
-        return getQueue();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index debf559..fc82f49 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -27,12 +27,13 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-public interface ReceivingDestination extends Destination
+public interface ReceivingDestination
 {
 
     Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
     Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
 
+    Symbol[] getCapabilities();
 
     Outcome[] getOutcomes();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
index 6d41bde..c03d698 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v1_0;
 
 
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-public interface SendingDestination extends Destination
+public interface SendingDestination
 {
+    Symbol[] getCapabilities();
+
     MessageSource getMessageSource();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index cbcccf5..cac4c8a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -117,12 +117,12 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
         boolean noLocal = false;
         JMSSelectorFilter messageFilter = null;
 
-        if(destination instanceof ExchangeDestination)
+        if(destination instanceof ExchangeSendingDestination)
         {
             options.add(ConsumerOption.ACQUIRES);
             options.add(ConsumerOption.SEES_REQUEUES);
         }
-        else if(destination instanceof MessageSourceDestination)
+        else if(destination instanceof StandardSendingDestination)
         {
             MessageSource messageSource = _destination.getMessageSource();
 
@@ -198,7 +198,8 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
     {
         final Source source = getSource();
         _consumerTarget = new ConsumerTarget_1_0(this,
-                                         _destination instanceof 
ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
+                                         _destination instanceof 
ExchangeSendingDestination
+                                                 ? true : 
source.getDistributionMode() != StdDistMode.COPY);
         try
         {
             final String name = getTarget().getAddress() == null ? 
getLinkName() : getTarget().getAddress();
@@ -291,7 +292,7 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
         final SendingDestination destination = 
getSession().getSendingDestination(getLink(), oldSource);
         prepareConsumerOptionsAndFilters(destination);
 
-        if (getDestination() instanceof ExchangeDestination && 
!Boolean.TRUE.equals(newSource.getDynamic()))
+        if (getDestination() instanceof ExchangeSendingDestination && 
!Boolean.TRUE.equals(newSource.getDynamic()))
         {
             final SendingDestination newDestination =
                     getSession().getSendingDestination(getLink(), newSource);
@@ -322,7 +323,7 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
         final SendingDestination destination = 
getSession().getSendingDestination(getLink(), oldSource);
         prepareConsumerOptionsAndFilters(destination);
 
-        if (getDestination() instanceof ExchangeDestination && 
!Boolean.TRUE.equals(newSource.getDynamic()))
+        if (getDestination() instanceof ExchangeSendingDestination && 
!Boolean.TRUE.equals(newSource.getDynamic()))
         {
             final SendingDestination newDestination =
                     getSession().getSendingDestination(getLink(), newSource);
@@ -489,13 +490,13 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
 
 
             Error closingError = null;
-            if (getDestination() instanceof ExchangeDestination
+            if (getDestination() instanceof ExchangeSendingDestination
                 && getSession().getConnection().getAddressSpace() instanceof 
QueueManagingVirtualHost)
             {
                 try
                 {
                     ((QueueManagingVirtualHost) 
getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(
-                            ((ExchangeDestination) 
getDestination()).getQueue().getName());
+                            ((ExchangeSendingDestination) 
getDestination()).getQueue().getName());
                 }
                 catch (AccessControlException e)
                 {
@@ -633,9 +634,9 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
             source.setCapabilities(attachSource.getCapabilities());
             final SendingDestination destination = 
getSession().getSendingDestination(getLink(), source);
             source.setCapabilities(destination.getCapabilities());
-            if (destination instanceof ExchangeDestination)
+            if (destination instanceof ExchangeSendingDestination)
             {
-                ExchangeDestination exchangeDestination = 
(ExchangeDestination) destination;
+                ExchangeSendingDestination exchangeDestination = 
(ExchangeSendingDestination) destination;
                 exchangeDestination.getQueue()
                                    .setAttributes(Collections.<String, 
Object>singletonMap(Queue.DESIRED_STATE,
                                                                                
            org.apache.qpid.server.model.State.ACTIVE));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ba06376..a5fe374 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import static 
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static 
org.apache.qpid.server.protocol.v1_0.ExchangeDestination.SHARED_CAPABILITY;
 
 import java.security.AccessControlContext;
 import java.security.AccessControlException;
@@ -29,7 +28,6 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -54,10 +52,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.filter.SelectorParsingException;
-import org.apache.qpid.server.filter.selector.ParseException;
-import org.apache.qpid.server.filter.selector.TokenMgrError;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -66,9 +60,7 @@ import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
@@ -89,14 +81,9 @@ import 
org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
-import 
org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import 
org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -118,12 +105,13 @@ import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class Session_1_0 extends AbstractAMQPSession<Session_1_0, 
ConsumerTarget_1_0>
         implements LogSubject, 
org.apache.qpid.server.util.Deletable<Session_1_0>
 {
-    public static final Symbol DELAYED_DELIVERY = 
Symbol.valueOf("DELAYED_DELIVERY");
+    static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+    static final Symbol SHARED_CAPABILITY = Symbol.getSymbol("shared");
+    static final Symbol GLOBAL_CAPABILITY = Symbol.getSymbol("global");
     private static final Logger _logger = 
LoggerFactory.getLogger(Session_1_0.class);
     public static final Symbol LIFETIME_POLICY = 
Symbol.valueOf("lifetime-policy");
     private static final EnumSet<SessionState> END_STATES =
@@ -644,15 +632,17 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
                 Exchange<?> exchange = getExchange(parts[0]);
                 if (exchange != null)
                 {
-                    Symbol[] capabilities1 = target.getCapabilities();
-                    ExchangeDestination exchangeDestination = new 
ExchangeDestination(exchange,
-                                                                               
       null,
-                                                                               
       target.getDurable(),
-                                                                               
       target.getExpiryPolicy(),
-                                                                               
       parts[0],
-                                                                               
       parts[1],
-                                                                               
       capabilities1 != null ? Arrays.asList(capabilities1) : 
Collections.<Symbol>emptyList());
-                    destination = exchangeDestination;
+
+
+                    destination =
+                            new NodeReceivingDestination(exchange,
+                                                         target.getDurable(),
+                                                         
target.getExpiryPolicy(),
+                                                         parts[0],
+                                                         
target.getCapabilities(),
+                                                         
_connection.getEventLogger());
+                    
((NodeReceivingDestination)destination).setRoutingAddress(parts[1]);
+                    
                 }
                 else
                 {
@@ -663,6 +653,21 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 MessageDestination messageDestination =
                         getAddressSpace().getAttainedMessageDestination(addr);
+
+                if(messageDestination == null)
+                {
+                    // TODO - should we do this... if the queue is not being 
advertised as a destination, shouldn't we
+                    //        respect that?
+
+                    // Covers the unlikely case where there is no attained 
destination with the given address, but there is
+                    // a queue with that address
+                    MessageSource source = 
getAddressSpace().getAttainedMessageSource(addr);
+                    if (source instanceof Queue)
+                    {
+                        messageDestination = (Queue<?>) source;
+                    }
+                }
+
                 if (messageDestination != null)
                 {
                     destination =
@@ -675,15 +680,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
                 }
                 else
                 {
-                    Queue<?> queue = getQueue(addr);
-                    if (queue != null)
-                    {
-                        destination = new QueueDestination(queue, addr);
-                    }
-                    else
-                    {
-                        destination = null;
-                    }
+                    destination = null;
                 }
             }
         }
@@ -705,13 +702,13 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
                                                 final SendingDestination 
newDestination)
     {
         SendingDestination oldDestination = linkEndpoint.getDestination();
-        if (oldDestination instanceof ExchangeDestination)
+        if (oldDestination instanceof ExchangeSendingDestination)
         {
-            ExchangeDestination oldExchangeDestination = (ExchangeDestination) 
oldDestination;
+            ExchangeSendingDestination oldExchangeDestination = 
(ExchangeSendingDestination) oldDestination;
             String newAddress = newSource.getAddress();
-            if (newDestination instanceof ExchangeDestination)
+            if (newDestination instanceof ExchangeSendingDestination)
             {
-                ExchangeDestination newExchangeDestination = 
(ExchangeDestination) newDestination;
+                ExchangeSendingDestination newExchangeDestination = 
(ExchangeSendingDestination) newDestination;
                 if (oldExchangeDestination.getQueue() != 
newExchangeDestination.getQueue())
                 {
                     Source oldSource = linkEndpoint.getSource();
@@ -747,7 +744,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
                 MessageSource queue = 
getAddressSpace().getAttainedMessageSource(address);
                 if (queue != null)
                 {
-                    destination = new MessageSourceDestination(queue);
+                    destination = new StandardSendingDestination(queue);
                 }
                 else
                 {
@@ -764,7 +761,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         return destination;
     }
 
-    private ExchangeDestination createExchangeDestination(String address, 
final String linkName, final Source source)
+    private ExchangeSendingDestination createExchangeDestination(String 
address, final String linkName, final Source source)
             throws AmqpErrorException
     {
         String[] parts = address.split("/", 2);
@@ -773,86 +770,21 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         return createExchangeDestination(exchangeName, bindingKey, linkName, 
source);
     }
 
-    private ExchangeDestination createExchangeDestination(final String 
exchangeName,
-                                                          final String 
bindingKey,
-                                                          final String 
linkName,
-                                                          final Source source) 
throws AmqpErrorException
+    private ExchangeSendingDestination createExchangeDestination(final String 
exchangeName,
+                                                                 final String 
bindingKey,
+                                                                 final String 
linkName,
+                                                                 final Source 
source) throws AmqpErrorException
     {
-        ExchangeDestination exchangeDestination = null;
+        ExchangeSendingDestination exchangeDestination = null;
         Exchange<?> exchange = getExchange(exchangeName);
-        List<Symbol> sourceCapabilities = new ArrayList<>();
         if (exchange != null)
         {
-            Queue queue = null;
             if (!Boolean.TRUE.equals(source.getDynamic()))
             {
-                final Map<String, Object> attributes = new HashMap<>();
-                boolean isDurable = source.getExpiryPolicy() == 
TerminusExpiryPolicy.NEVER;
-                boolean isShared = hasCapability(source.getCapabilities(), 
SHARED_CAPABILITY);
-                boolean isGlobal = hasCapability(source.getCapabilities(), 
ExchangeDestination.GLOBAL_CAPABILITY);
-
-                final String name = getMangledSubscriptionName(linkName, 
isDurable, isShared, isGlobal);
-
-                if (isGlobal)
-                {
-                    
sourceCapabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
-                }
-
-                ExclusivityPolicy exclusivityPolicy;
-                if (isShared)
-                {
-                    exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
-                    sourceCapabilities.add(SHARED_CAPABILITY);
-                }
-                else
-                {
-                    exclusivityPolicy = ExclusivityPolicy.LINK;
-                }
-
-                org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = 
getLifetimePolicy(source.getExpiryPolicy());
-
-                attributes.put(Queue.ID, UUID.randomUUID());
-                attributes.put(Queue.NAME, name);
-                attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
-                attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
-                attributes.put(Queue.DURABLE, isDurable);
-
-                BindingInfo bindingInfo = new BindingInfo(exchange, name,
-                                                          bindingKey, 
source.getFilter());
-                Map<String, Map<String, Object>> bindings = 
bindingInfo.getBindings();
-                try
-                {
-                    if (getAddressSpace() instanceof QueueManagingVirtualHost)
-                    {
-                        try
-                        {
-                            queue = ((QueueManagingVirtualHost) 
getAddressSpace()).getSubscriptionQueue(exchangeName, attributes, bindings);
-                        }
-                        catch (NotFoundException e)
-                        {
-                            throw new AmqpErrorException(new 
Error(AmqpError.NOT_FOUND, e.getMessage()));
-                        }
-                    }
-                    else
-                    {
-                        throw new AmqpErrorException(new 
Error(AmqpError.INTERNAL_ERROR,
-                                                               "Address space 
of unexpected type"));
-                    }
-                }
-                catch(IllegalStateException e)
-                {
-                    throw new AmqpErrorException(new 
Error(AmqpError.RESOURCE_LOCKED,
-                                                           "Subscription is 
already in use"));
-                }
-                source.setFilter(bindingInfo.getActualFilters().isEmpty() ? 
null : bindingInfo.getActualFilters());
+                String remoteContainerId = 
getConnection().getRemoteContainerId();
+                exchangeDestination = new ExchangeSendingDestination(exchange, 
linkName, bindingKey, remoteContainerId, source);
+                source.setFilter(exchangeDestination.getFilters());
                 source.setDistributionMode(StdDistMode.COPY);
-                exchangeDestination = new ExchangeDestination(exchange,
-                                                              queue,
-                                                              
source.getDurable(),
-                                                              
source.getExpiryPolicy(),
-                                                              exchangeName,
-                                                              bindingKey,
-                                                              
sourceCapabilities);
             }
             else
             {
@@ -863,98 +795,6 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         return exchangeDestination;
     }
 
-    private org.apache.qpid.server.model.LifetimePolicy 
getLifetimePolicy(final TerminusExpiryPolicy expiryPolicy) throws 
AmqpErrorException
-    {
-        org.apache.qpid.server.model.LifetimePolicy lifetimePolicy;
-        if (expiryPolicy == null || expiryPolicy == 
TerminusExpiryPolicy.SESSION_END)
-        {
-            lifetimePolicy = 
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_SESSION_END;
-        }
-        else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
-        {
-            lifetimePolicy = 
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
-        }
-        else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
-        {
-            lifetimePolicy = 
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
-        }
-        else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
-        {
-            lifetimePolicy = 
org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
-        }
-        else
-        {
-            Error error = new Error(AmqpError.NOT_IMPLEMENTED,
-                                    String.format("unknown ExpiryPolicy '%s'", 
expiryPolicy.getValue()));
-            throw new AmqpErrorException(error);
-        }
-        return lifetimePolicy;
-    }
-
-    private String getMangledSubscriptionName(final String linkName,
-                                              final boolean isDurable,
-                                              final boolean isShared,
-                                              final boolean isGlobal)
-    {
-        String remoteContainerId = getConnection().getRemoteContainerId();
-        if (isGlobal)
-        {
-            remoteContainerId = "_global_";
-        }
-        else
-        {
-            remoteContainerId = sanitizeName(remoteContainerId);
-        }
-
-        String subscriptionName;
-        if (!isDurable && !isShared)
-        {
-            subscriptionName = UUID.randomUUID().toString();
-        }
-        else
-        {
-            subscriptionName = linkName;
-            if (isShared)
-            {
-                int separator = subscriptionName.indexOf("|");
-                if (separator > 0)
-                {
-                    subscriptionName = subscriptionName.substring(0, 
separator);
-                }
-            }
-            subscriptionName = sanitizeName(subscriptionName);
-        }
-        return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + 
"_/" + (isDurable
-                ? "durable"
-                : "nondurable");
-    }
-
-    private String sanitizeName(String name)
-    {
-        return name.replace("_", "__")
-                   .replace(".", "_:")
-                   .replace("(", "_O")
-                   .replace(")", "_C")
-                   .replace("<", "_L")
-                   .replace(">", "_R");
-    }
-
-    private boolean hasCapability(final Symbol[] capabilities,
-                                  final Symbol expectedCapability)
-    {
-        if (capabilities != null)
-        {
-            for (Symbol capability : capabilities)
-            {
-                if (expectedCapability.equals(capability))
-                {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
     private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
                                               Map properties)
     {
@@ -1214,8 +1054,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     private boolean isQueueDestinationForLink(final Queue<?> queue, final 
ReceivingDestination recvDest)
     {
         return (recvDest instanceof NodeReceivingDestination
-                && queue == ((NodeReceivingDestination) 
recvDest).getDestination())
-               || recvDest instanceof QueueDestination && queue == 
((QueueDestination) recvDest).getQueue();
+                && queue == ((NodeReceivingDestination) 
recvDest).getDestination());
     }
 
     @Override
@@ -1629,133 +1468,4 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
             throw new ConnectionScopedRuntimeException(errorMessage, t);
         }
     }
-
-    private final class BindingInfo
-    {
-        private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
-        private final Map<String, Map<String, Object>> _bindings = new 
HashMap<>();
-
-        private BindingInfo(Exchange<?> exchange,
-                            final String queueName,
-                            String bindingKey,
-                            Map<Symbol, Filter> filters) throws 
AmqpErrorException
-        {
-            String binding = null;
-            final Map<String, Object> arguments = new HashMap<>();
-            if (filters != null && !filters.isEmpty())
-            {
-                boolean hasBindingFilter = false;
-                boolean hasMessageFilter = false;
-                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
-                {
-                    if(!hasBindingFilter
-                       && entry.getValue() instanceof ExactSubjectFilter
-                       && 
exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
-                    {
-                        ExactSubjectFilter filter = (ExactSubjectFilter) 
entry.getValue();
-                        binding = filter.getValue();
-                        _actualFilters.put(entry.getKey(), filter);
-                        hasBindingFilter = true;
-                    }
-                    else if(!hasBindingFilter
-                            && entry.getValue() instanceof 
MatchingSubjectFilter
-                            && 
exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
-                    {
-                        MatchingSubjectFilter filter = (MatchingSubjectFilter) 
entry.getValue();
-                        binding = filter.getValue();
-                        _actualFilters.put(entry.getKey(), filter);
-                        hasBindingFilter = true;
-                    }
-                    else if(entry.getValue() instanceof NoLocalFilter)
-                    {
-                        _actualFilters.put(entry.getKey(), entry.getValue());
-                        arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), 
true);
-                    }
-                    else if (!hasMessageFilter
-                             && entry.getValue() instanceof 
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
-                    {
-                        
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter 
selectorFilter =
-                                
(org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) 
entry.getValue();
-
-                        // TODO: QPID-7642 - due to inconsistent handling of 
invalid filters
-                        // by different exchange implementations
-                        // we need to validate filter before creation of 
binding
-                        try
-                        {
-                            new 
org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
-                        }
-                        catch (ParseException | SelectorParsingException | 
TokenMgrError e)
-                        {
-                            Error error = new Error();
-                            error.setCondition(AmqpError.INVALID_FIELD);
-                            error.setDescription("Invalid JMS Selector: " + 
selectorFilter.getValue());
-                            
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), 
Symbol.valueOf("filter")));
-                            throw new AmqpErrorException(error);
-                        }
-
-                        arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), 
selectorFilter.getValue());
-                        _actualFilters.put(entry.getKey(), selectorFilter);
-                        hasMessageFilter = true;
-                    }
-                }
-            }
-
-            if(binding != null)
-            {
-                _bindings.put(binding, arguments);
-            }
-            if(bindingKey != null)
-            {
-                _bindings.put(bindingKey, arguments);
-            }
-            if(binding == null
-               && bindingKey == null
-               && 
exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
-            {
-                _bindings.put(queueName, arguments);
-            }
-            else if(binding == null
-                    && bindingKey == null
-                    && 
exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
-            {
-                _bindings.put("#", arguments);
-            }
-        }
-
-        private Map<Symbol, Filter> getActualFilters()
-        {
-            return _actualFilters;
-        }
-
-        private Map<String, Map<String, Object>> getBindings()
-        {
-            return _bindings;
-        }
-
-
-        @Override
-        public boolean equals(final Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-
-            final BindingInfo that = (BindingInfo) o;
-
-            return _actualFilters.equals(that._actualFilters) && 
_bindings.equals(that._bindings);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            int result = _actualFilters.hashCode();
-            result = 31 * result + _bindings.hashCode();
-            return result;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f6b59251/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
new file mode 100644
index 0000000..7940c8d
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+
+public class StandardSendingDestination implements SendingDestination
+{
+    private static final Accepted ACCEPTED = new Accepted();
+    private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+
+
+    private final MessageSource _messageSource;
+    private final Symbol[] _capabilities;
+
+    public StandardSendingDestination(MessageSource messageSource)
+    {
+        _messageSource = messageSource;
+        List<Symbol> capabilities = new ArrayList<>();
+        if (_messageSource instanceof Queue)
+        {
+            LifetimePolicy queueLifetimePolicy = ((Queue<?>) 
_messageSource).getLifetimePolicy();
+            if (PERMANENT == queueLifetimePolicy)
+            {
+                capabilities.add(Symbol.getSymbol("queue"));
+            }
+            else
+            {
+                capabilities.add(Symbol.getSymbol("temporary-queue"));
+            }
+        }
+        _capabilities = capabilities.toArray(new Symbol[capabilities.size()]);
+    }
+
+    public Outcome[] getOutcomes()
+    {
+        return OUTCOMES;
+    }
+
+    @Override
+    public MessageSource getMessageSource()
+    {
+        return _messageSource;
+    }
+
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        return _capabilities;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to