Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Mon Mar  6 15:06:57 2017
@@ -46,6 +46,9 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,11 +62,8 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -71,8 +71,7 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -89,7 +88,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -98,7 +96,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -110,10 +107,10 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AbstractAMQPSession;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -138,14 +135,11 @@ public class Session_1_0 extends Abstrac
 
     private SessionState _sessionState;
 
-    private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new 
HashMap<>();
-    private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new 
HashMap<>();
-    private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new 
HashMap<>();
-    private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = 
new HashMap<>();
+    private final Map<LinkEndpoint<?>, UnsignedInteger> 
_endpointToOutputHandle = new HashMap<>();
+    private final Map<UnsignedInteger, LinkEndpoint<?>> _inputHandleToEndpoint 
= new HashMap<>();
+    private final Set<LinkEndpoint<?>> _associatedLinkEndpoints = new 
HashSet<>();
 
-    private final List<TxnCoordinatorReceivingLink_1_0> _txnCoordinatorLinks = 
new ArrayList<>();
-
-    private short _receivingChannel;
+    private final short _receivingChannel;
     private final short _sendingChannel;
 
 
@@ -213,73 +207,37 @@ public class Session_1_0 extends Abstrac
     {
         if(_sessionState == SessionState.ACTIVE)
         {
-            UnsignedInteger handle = attach.getHandle();
-            if(_remoteLinkEndpoints.containsKey(handle))
+            UnsignedInteger inputHandle = attach.getHandle();
+            if (_inputHandleToEndpoint.containsKey(inputHandle))
             {
-                // TODO - Error - handle busy?
+                getConnection().close(new Error(SessionError.HANDLE_IN_USE, 
"inputHandle of Attach already in use: " + attach.toString()));
+                throw new 
ConnectionScopedRuntimeException(String.format("Input Handle '%d' already in 
use", inputHandle.intValue()));
             }
             else
             {
-                Map<String, ? extends LinkEndpoint> linkMap =
-                        attach.getRole() == Role.RECEIVER ? _sendingLinkMap : 
_receivingLinkMap;
-                LinkEndpoint endpoint = linkMap.get(attach.getName());
-                if(endpoint == null)
+                final Class<? extends LinkModel> linkType;
+                if (attach.getRole() == Role.RECEIVER)
                 {
-
-                    if (attach.getRole() == Role.RECEIVER)
+                    linkType = SendingLink_1_0.class;
+                }
+                else
+                {
+                    if (attach.getTarget() instanceof Coordinator)
                     {
-                        endpoint = new SendingLinkEndpoint(this, attach);
-                    }
-                    else if (attach.getRole() == Role.SENDER)
-                    {
-                        if (attach.getTarget() instanceof Coordinator)
-                        {
-                            endpoint = new 
TxnCoordinatorReceivingLinkEndpoint(this, attach);
-                        }
-                        else
-                        {
-                            endpoint = new StandardReceivingLinkEndpoint(this, 
attach);
-                        }
+                        linkType = TxnCoordinatorReceivingLink_1_0.class;
                     }
                     else
                     {
-                        // TODO error handling
-                    }
-
-                    if(_blockingEntities.contains(this) && attach.getRole() == 
Role.SENDER)
-                    {
-                        endpoint.setStopped(true);
+                        linkType = StandardReceivingLink_1_0.class;
                     }
-
-                    // TODO : fix below - distinguish between local and remote 
owned
-                    endpoint.setSource(attach.getSource());
-                    endpoint.setTarget(attach.getTarget());
-                    ((Map<String,LinkEndpoint>)linkMap).put(attach.getName(), 
endpoint);
                 }
-                else
-                {
-                    endpoint.receiveAttach(attach);
-                }
-
-                if(attach.getRole() == Role.SENDER)
-                {
-                    
endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
-                }
-
-                _remoteLinkEndpoints.put(handle, endpoint);
 
-                if(!_localLinkEndpoints.containsKey(endpoint))
-                {
-                    UnsignedInteger localHandle = findNextAvailableHandle();
-                    endpoint.setLocalHandle(localHandle);
-                    _localLinkEndpoints.put(endpoint, localHandle);
+                final Link_1_0 link = (Link_1_0) 
getAddressSpace().getLink(getConnection().getRemoteContainerId(),
+                                                                           
attach.getName(),
+                                                                           
linkType);
+                final ListenableFuture<? extends LinkEndpoint<?>> future = 
link.attach(this, attach);
 
-                    remoteLinkCreation(endpoint, attach);
-                }
-                else
-                {
-                    // TODO - error already attached
-                }
+                addFutureCallback(future, new 
EndpointCreationCallback(attach), MoreExecutors.directExecutor());
             }
         }
     }
@@ -419,18 +377,16 @@ public class Session_1_0 extends Abstrac
             case ACTIVE:
                 detachLinks();
                 remoteEnd(end);
-                short sendChannel = _sendingChannel;
-                _connection.sendEnd(sendChannel, new End(), true);
+                _connection.sendEnd(_sendingChannel, new End(), true);
                 _sessionState = SessionState.ENDED;
                 break;
             default:
-                sendChannel = _sendingChannel;
                 End reply = new End();
                 Error error = new Error();
                 error.setCondition(AmqpError.ILLEGAL_STATE);
                 error.setDescription("END called on Session which has not been 
opened");
                 reply.setError(error);
-                _connection.sendEnd(sendChannel, reply, true);
+                _connection.sendEnd(_sendingChannel, reply, true);
                 break;
         }
     }
@@ -463,7 +419,7 @@ public class Session_1_0 extends Abstrac
     public void receiveFlow(final Flow flow)
     {
         UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint endpoint = handle == null ? null : 
_remoteLinkEndpoints.get(handle);
+        final LinkEndpoint<?> endpoint = handle == null ? null : 
_inputHandleToEndpoint.get(handle);
 
         final UnsignedInteger nextOutgoingId =
                 flow.getNextIncomingId() == null ? _initialOutgoingId : 
flow.getNextIncomingId();
@@ -476,8 +432,8 @@ public class Session_1_0 extends Abstrac
         }
         else
         {
-            final Collection<LinkEndpoint> allLinkEndpoints = 
_remoteLinkEndpoints.values();
-            for (LinkEndpoint le : allLinkEndpoints)
+            final Collection<LinkEndpoint<?>> allLinkEndpoints = 
_inputHandleToEndpoint.values();
+            for (LinkEndpoint<?> le : allLinkEndpoints)
             {
                 le.flowStateChanged();
             }
@@ -620,16 +576,14 @@ public class Session_1_0 extends Abstrac
     {
         _nextIncomingTransferId.incr();
 
-        UnsignedInteger handle = transfer.getHandle();
-
-
-        LinkEndpoint linkEndpoint = _remoteLinkEndpoints.get(handle);
+        UnsignedInteger inputHandle = transfer.getHandle();
+        LinkEndpoint<?> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
 
         if (linkEndpoint == null)
         {
             Error error = new Error();
             error.setCondition(AmqpError.ILLEGAL_STATE);
-            error.setDescription("TRANSFER called on Session for link handle " 
+ handle + " which is not attached");
+            error.setDescription("TRANSFER called on Session for link handle " 
+ inputHandle + " which is not attached");
             _connection.close(error);
 
         }
@@ -638,7 +592,7 @@ public class Session_1_0 extends Abstrac
 
             Error error = new Error();
             error.setCondition(ConnectionError.FRAMING_ERROR);
-            error.setDescription("TRANSFER called on Session for link handle " 
+ handle + " which is a sending ink not a receiving link");
+            error.setDescription("TRANSFER called on Session for link handle " 
+ inputHandle + " which is a sending ink not a receiving link");
             _connection.close(error);
 
         }
@@ -681,7 +635,7 @@ public class Session_1_0 extends Abstrac
                     Error error = new Error();
                     error.setCondition(AmqpError.ILLEGAL_STATE);
                     error.setDescription("TRANSFER called on Session for link 
handle "
-                                         + handle
+                                         + inputHandle
                                          + " with incorrect delivery id "
                                          + transfer.getDeliveryId());
                     reply.setError(error);
@@ -704,11 +658,6 @@ public class Session_1_0 extends Abstrac
         }
     }
 
-    private Collection<LinkEndpoint> getLocalLinkEndpoints()
-    {
-        return new ArrayList<>(_localLinkEndpoints.keySet());
-    }
-
     boolean isEnded()
     {
         return _sessionState == SessionState.ENDED || _connection.isClosed();
@@ -724,413 +673,94 @@ public class Session_1_0 extends Abstrac
         return _accessControllerContext;
     }
 
-    public void remoteLinkCreation(final LinkEndpoint endpoint, Attach attach)
+    public ReceivingDestination getReceivingDestination(final Target target) 
throws AmqpErrorException
     {
-        Link_1_0 link = null;
-        Error error = null;
-        Set<Symbol> capabilities = new HashSet<>();
-        try
+        final ReceivingDestination destination;
+        if (target != null)
         {
-            if (endpoint.getRole() == Role.SENDER)
+            if (Boolean.TRUE.equals(target.getDynamic()))
             {
-                link = createSendingLink(endpoint, attach);
-                if (link != null)
-                {
-                    capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
-                }
-            }
-            else if (endpoint.getTarget() instanceof Coordinator)
-            {
-                link = createCoordinatorLink(endpoint);
+                MessageDestination tempQueue = 
createDynamicDestination(target.getDynamicNodeProperties());
+                target.setAddress(tempQueue.getName());
             }
-            else // standard  (non-Coordinator) receiver
-            {
-                link = createReceivingLink(endpoint, capabilities);
-            }
-        }
-        catch (AmqpErrorException e)
-        {
 
-            if (e.getError() == null || e.getError().getCondition() == 
AmqpError.INTERNAL_ERROR)
-            {
-                _logger.error("Could not create link", e);
-            }
-            else
-            {
-                _logger.debug("Could not create link", e);
-            }
-            if (endpoint.getRole() == Role.SENDER)
+            String addr = target.getAddress();
+            if (addr == null || "".equals(addr.trim()))
             {
-                endpoint.setSource(null);
+                MessageDestination messageDestination = 
getAddressSpace().getDefaultDestination();
+                destination = new NodeReceivingDestination(messageDestination, 
target.getDurable(),
+                                                           
target.getExpiryPolicy(), "",
+                                                           
target.getCapabilities(),
+                                                           
_connection.getEventLogger());
             }
-            else
-            {
-                endpoint.setTarget(null);
-            }
-            error = e.getError();
-        }
-
-        endpoint.setCapabilities(capabilities);
-        endpoint.attach();
-
-        if (link == null)
-        {
-            if (error == null)
-            {
-                error = new Error();
-                error.setCondition(AmqpError.NOT_FOUND);
-            }
-            endpoint.close(error);
-        }
-        else
-        {
-            endpoint.start();
-        }
-    }
-
-    private Link_1_0 createReceivingLink(final LinkEndpoint endpoint,
-                                         final Set<Symbol> capabilities)
-    {
-        Link_1_0 link = null;
-        Destination destination;
-        final LinkRegistry linkRegistry = 
getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-        StandardReceivingLink_1_0 previousLink =
-                (StandardReceivingLink_1_0) 
linkRegistry.getDurableReceivingLink(endpoint.getName());
-
-        if (previousLink == null)
-        {
-
-            Target target = (Target) endpoint.getTarget();
-
-            if (target != null)
+            else if (!addr.startsWith("/") && addr.contains("/"))
             {
-                if (Boolean.TRUE.equals(target.getDynamic()))
+                String[] parts = addr.split("/", 2);
+                Exchange<?> exchange = getExchange(parts[0]);
+                if (exchange != null)
                 {
-
-                    MessageDestination tempQueue = 
createDynamicDestination(target.getDynamicNodeProperties());
-                    target.setAddress(tempQueue.getName());
+                    Symbol[] capabilities1 = target.getCapabilities();
+                    ExchangeDestination exchangeDestination = new 
ExchangeDestination(exchange,
+                                                                               
       null,
+                                                                               
       target.getDurable(),
+                                                                               
       target.getExpiryPolicy(),
+                                                                               
       parts[0],
+                                                                               
       parts[1],
+                                                                               
       capabilities1 != null ? Arrays.asList(capabilities1) : 
Collections.<Symbol>emptyList());
+                    destination = exchangeDestination;
                 }
-
-                String addr = target.getAddress();
-                if (addr == null || "".equals(addr.trim()))
+                else
                 {
-                    MessageDestination messageDestination = 
getAddressSpace().getDefaultDestination();
-                    destination = new 
NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                               
target.getExpiryPolicy(), "",
-                                                               
target.getCapabilities(),
-                                                               
_connection.getEventLogger());
-                    target.setCapabilities(destination.getCapabilities());
-
-                    if (_blockingEntities.contains(messageDestination))
-                    {
-                        endpoint.setStopped(true);
-                    }
+                    destination = null;
                 }
-                else if (!addr.startsWith("/") && addr.contains("/"))
-                {
-                    String[] parts = addr.split("/", 2);
-                    Exchange<?> exchange = getExchange(parts[0]);
-                    if (exchange != null)
-                    {
-                        Symbol[] capabilities1 = target.getCapabilities();
-                        ExchangeDestination exchangeDestination = new 
ExchangeDestination(exchange,
-                                                                               
           null,
-                                                                               
           target.getDurable(),
-                                                                               
           target.getExpiryPolicy(),
-                                                                               
           parts[0],
-                                                                               
           parts[1],
-                                                                               
           capabilities1 != null ? Arrays.asList(capabilities1) : 
Collections.<Symbol>emptyList());
-                        
target.setCapabilities(exchangeDestination.getCapabilities());
-                        destination = exchangeDestination;
-                    }
-                    else
-                    {
-                        endpoint.setTarget(null);
-                        destination = null;
-                    }
+            }
+            else
+            {
+                MessageDestination messageDestination =
+                        getAddressSpace().getAttainedMessageDestination(addr);
+                if (messageDestination != null)
+                {
+                    destination =
+                            new NodeReceivingDestination(messageDestination,
+                                                         target.getDurable(),
+                                                         
target.getExpiryPolicy(),
+                                                         addr,
+                                                         
target.getCapabilities(),
+                                                         
_connection.getEventLogger());
                 }
                 else
                 {
-                    MessageDestination messageDestination =
-                            
getAddressSpace().getAttainedMessageDestination(addr);
-                    if (messageDestination != null)
+                    Queue<?> queue = getQueue(addr);
+                    if (queue != null)
                     {
-                        destination =
-                                new 
NodeReceivingDestination(messageDestination,
-                                                             
target.getDurable(),
-                                                             
target.getExpiryPolicy(),
-                                                             addr,
-                                                             
target.getCapabilities(),
-                                                             
_connection.getEventLogger());
-                        target.setCapabilities(destination.getCapabilities());
+                        destination = new QueueDestination(queue, addr);
                     }
                     else
                     {
-                        Queue<?> queue = getQueue(addr);
-                        if (queue != null)
-                        {
-
-                            destination = new QueueDestination(queue, addr);
-                        }
-                        else
-                        {
-                            endpoint.setTarget(null);
-                            destination = null;
-                        }
+                        destination = null;
                     }
                 }
             }
-            else
-            {
-                destination = null;
-            }
-            if (destination != null)
-            {
-                final ReceivingDestination receivingDestination = 
(ReceivingDestination) destination;
-
-                MessageDestination messageDestination = 
receivingDestination.getMessageDestination();
-                if(!(messageDestination instanceof Queue) || 
((Queue<?>)messageDestination).isHoldOnPublishEnabled())
-                {
-                    capabilities.add(DELAYED_DELIVERY);
-                }
-                final ReceivingLinkEndpoint receivingLinkEndpoint = 
(ReceivingLinkEndpoint) endpoint;
-                final StandardReceivingLink_1_0 receivingLink = new 
StandardReceivingLink_1_0(receivingLinkEndpoint);
-                receivingLinkEndpoint.setDestination(receivingDestination);
-                receivingLinkEndpoint.setLink(receivingLink);
-                link = receivingLink;
-                if 
(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
-                    || 
TerminusDurability.CONFIGURATION.equals(target.getDurable()))
-                {
-                    linkRegistry.registerReceivingLink(endpoint.getName(), 
receivingLink);
-                }
-            }
         }
         else
         {
-            StandardReceivingLinkEndpoint receivingLinkEndpoint = 
(StandardReceivingLinkEndpoint) endpoint;
-            previousLink.setLinkAttachment(receivingLinkEndpoint);
-            receivingLinkEndpoint.setLink(previousLink);
-            link = previousLink;
-            
receivingLinkEndpoint.setLocalUnsettled(receivingLinkEndpoint.getUnsettledOutcomeMap());
+            destination = null;
         }
-        return link;
-    }
 
-    private TxnCoordinatorReceivingLink_1_0 createCoordinatorLink(final 
LinkEndpoint endpoint) throws AmqpErrorException
-    {
-        Coordinator coordinator = (Coordinator) endpoint.getTarget();
-        TxnCapability[] coordinatorCapabilities = 
coordinator.getCapabilities();
-        boolean localTxn = false;
-        boolean multiplePerSession = false;
-        if (coordinatorCapabilities != null)
+        if (destination != null)
         {
-            for (TxnCapability capability : coordinatorCapabilities)
-            {
-                if (capability.equals(TxnCapability.LOCAL_TXN))
-                {
-                    localTxn = true;
-                }
-                else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
-                {
-                    multiplePerSession = true;
-                }
-                else
-                {
-                    Error error = new Error();
-                    error.setCondition(AmqpError.NOT_IMPLEMENTED);
-                    error.setDescription("Unsupported capability: " + 
capability);
-                    throw new AmqpErrorException(error);
-                }
-            }
-        }
-
-        final ReceivingLinkEndpoint receivingLinkEndpoint = 
(ReceivingLinkEndpoint) endpoint;
-        final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
-                new TxnCoordinatorReceivingLink_1_0(
-                        receivingLinkEndpoint
-                );
-        receivingLinkEndpoint.setLink(coordinatorLink);
-        return coordinatorLink;
-    }
-
-    private SendingLink_1_0 createSendingLink(final LinkEndpoint endpoint, 
Attach attach) throws AmqpErrorException
-    {
-        final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) 
endpoint;
-        SendingLink_1_0 link = null;
-        final LinkRegistry linkRegistry = 
getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-        final SendingLink_1_0 previousLink = (SendingLink_1_0) 
linkRegistry.getDurableSendingLink(endpoint.getName());
-
-        if (previousLink == null)
-        {
-            Source source = (Source) sendingLinkEndpoint.getSource();
-            SendingDestination destination = null;
-            if (source != null)
-            {
-                destination = 
getSendingDestination(sendingLinkEndpoint.getName(), source);
-                if (destination == null)
-                {
-                    sendingLinkEndpoint.setSource(null);
-                }
-                else
-                {
-                    source.setCapabilities(destination.getCapabilities());
-                }
-            }
-            else
-            {
-                final Symbol[] linkCapabilities = 
attach.getDesiredCapabilities();
-                boolean isGlobal = hasCapability(linkCapabilities, 
ExchangeDestination.GLOBAL_CAPABILITY);
-                final String queueName = 
getMangledSubscriptionName(endpoint.getName(), true, true, isGlobal);
-                final MessageSource messageSource = 
getAddressSpace().getAttainedMessageSource(queueName);
-                // TODO START The Source should be persisted on the 
LinkEndpoint
-                if (messageSource instanceof Queue)
-                {
-                    Queue<?> queue = (Queue<?>) messageSource;
-                    source = new Source();
-                    List<Symbol> capabilities = new ArrayList<>();
-                    if (queue.getExclusive() == 
ExclusivityPolicy.SHARED_SUBSCRIPTION)
-                    {
-                        
capabilities.add(ExchangeDestination.SHARED_CAPABILITY);
-                    }
-                    if (isGlobal)
-                    {
-                        
capabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
-                    }
-                    capabilities.add(ExchangeDestination.TOPIC_CAPABILITY);
-                    source.setCapabilities(capabilities.toArray(new 
Symbol[capabilities.size()]));
-
-                    final Collection<Exchange> exchanges = 
queue.getVirtualHost().getChildren(Exchange.class);
-                    String bindingKey = null;
-                    Exchange<?> foundExchange = null;
-                    for (Exchange<?> exchange : exchanges)
-                    {
-                        for (Binding binding : 
exchange.getPublishingLinks(queue))
-                        {
-                            String exchangeName = exchange.getName();
-                            bindingKey = binding.getName();
-                            final Map<String, Object> bindingArguments = 
binding.getArguments();
-                            Map<Symbol, Filter> filter = new HashMap<>();
-                            if 
(bindingArguments.containsKey(AMQPFilterTypes.JMS_SELECTOR))
-                            {
-                                filter.put(Symbol.getSymbol("jms-selector"), 
new JMSSelectorFilter((String) 
bindingArguments.get(AMQPFilterTypes.JMS_SELECTOR)));
-                            }
-                            if 
(bindingArguments.containsKey(AMQPFilterTypes.NO_LOCAL))
-                            {
-                                filter.put(Symbol.getSymbol("no-local"), 
NoLocalFilter.INSTANCE);
-                            }
-                            foundExchange = exchange;
-                            source.setAddress(exchangeName + "/" + bindingKey);
-                            source.setFilter(filter);
-                            break;
-                        }
-                        if (foundExchange != null)
-                        {
-                            break;
-                        }
-                    }
-                    if (foundExchange != null)
-                    {
-                        source.setDurable(TerminusDurability.CONFIGURATION);
-                        TerminusExpiryPolicy terminusExpiryPolicy;
-                        switch (queue.getLifetimePolicy())
-                        {
-                            case PERMANENT:
-                                terminusExpiryPolicy = 
TerminusExpiryPolicy.NEVER;
-                                break;
-                            case DELETE_ON_NO_LINKS:
-                            case DELETE_ON_NO_OUTBOUND_LINKS:
-                                terminusExpiryPolicy = 
TerminusExpiryPolicy.LINK_DETACH;
-                                break;
-                            case DELETE_ON_CONNECTION_CLOSE:
-                                terminusExpiryPolicy = 
TerminusExpiryPolicy.CONNECTION_CLOSE;
-                                break;
-                            case DELETE_ON_SESSION_END:
-                                terminusExpiryPolicy = 
TerminusExpiryPolicy.SESSION_END;
-                                break;
-                            default:
-                                throw new AmqpErrorException(new 
Error(AmqpError.NOT_IMPLEMENTED, "unexpected liftetime policy " + 
queue.getLifetimePolicy()));
-                        }
-                        sendingLinkEndpoint.setSource(source);
-                        destination = new ExchangeDestination(foundExchange,
-                                                              queue,
-                                                              
TerminusDurability.CONFIGURATION,
-                                                              
terminusExpiryPolicy,
-                                                              
foundExchange.getName(),
-                                                              bindingKey,
-                                                              capabilities);
-                    }
-
-                }
-                // TODO END
-            }
-
-            if (destination != null)
-            {
-                final SendingLink_1_0 sendingLink =
-                        new SendingLink_1_0(
-                                sendingLinkEndpoint);
-                //sendingLink.createConsumerTarget();
-
-                sendingLinkEndpoint.doStuff(destination);
-                sendingLinkEndpoint.createConsumerTarget();
-
-                sendingLinkEndpoint.setLink(sendingLink);
-                sendingLinkEndpoint.setDurability(((Source) 
attach.getSource()).getDurable());
-                registerConsumer(sendingLinkEndpoint);
-
-                if (destination instanceof ExchangeDestination)
-                {
-                    ExchangeDestination exchangeDestination = 
(ExchangeDestination) destination;
-                    
exchangeDestination.getQueue().setAttributes(Collections.<String, 
Object>singletonMap(Queue.DESIRED_STATE, State.ACTIVE));
-                }
-
-                link = sendingLink;
-                if 
(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
-                    || 
TerminusDurability.CONFIGURATION.equals(source.getDurable()))
-                {
-                    linkRegistry.registerSendingLink(endpoint.getName(), 
sendingLink);
-                }
-
-            }
+            target.setCapabilities(destination.getCapabilities());
         }
         else
         {
-            Source newSource = (Source) attach.getSource();
-            Source oldSource = (Source) previousLink.getEndpoint().getSource();
-
-            if (sendingLinkEndpoint.getDestination() == null)
-            {
-                final SendingDestination oldDestination =
-                        
getSendingDestination(previousLink.getEndpoint().getName(), oldSource);
-                sendingLinkEndpoint.doStuff(oldDestination);
-                sendingLinkEndpoint.setDurability(oldSource.getDurable());
-
-            }
-
-            if (sendingLinkEndpoint.getDestination() instanceof 
ExchangeDestination
-                && newSource != null
-                && !Boolean.TRUE.equals(newSource.getDynamic()))
-            {
-                final SendingDestination newDestination =
-                        
getSendingDestination(previousLink.getEndpoint().getName(), newSource);
-                if (updateSourceForSubscription(sendingLinkEndpoint, 
newSource, newDestination))
-                {
-                    sendingLinkEndpoint.setDestination(newDestination);
-                }
-            }
-
-            sendingLinkEndpoint.setSource(oldSource);
-            previousLink.setLinkAttachment(this, sendingLinkEndpoint);
-            sendingLinkEndpoint.setLink(previousLink);
-            link = previousLink;
-            
sendingLinkEndpoint.setLocalUnsettled(sendingLinkEndpoint.getUnsettledOutcomeMap());
-            registerConsumer(sendingLinkEndpoint);
-
+            throw new AmqpErrorException(AmqpError.NOT_FOUND,
+                                         String.format("Could not find 
destination for target '%s'", target));
         }
-        return link;
+
+        return destination;
     }
 
-    private boolean updateSourceForSubscription(final SendingLinkEndpoint 
linkEndpoint, final Source newSource,
+    public boolean updateSourceForSubscription(final SendingLinkEndpoint 
linkEndpoint, final Source newSource,
                                                 final SendingDestination 
newDestination)
     {
         SendingDestination oldDestination = linkEndpoint.getDestination();
@@ -1153,7 +783,7 @@ public class Session_1_0 extends Abstrac
         return false;
     }
 
-    private SendingDestination getSendingDestination(final String linkName, 
final Source source) throws AmqpErrorException
+    public SendingDestination getSendingDestination(final String linkName, 
final Source source) throws AmqpErrorException
     {
         SendingDestination destination = null;
 
@@ -1184,6 +814,11 @@ public class Session_1_0 extends Abstrac
             }
         }
 
+        if (destination == null)
+        {
+            throw new AmqpErrorException(AmqpError.NOT_FOUND,
+                                         String.format("Could not find 
destination for source '%s'", source));
+        }
         return destination;
     }
 
@@ -1378,18 +1013,6 @@ public class Session_1_0 extends Abstrac
         return false;
     }
 
-
-    private void registerConsumer(final SendingLinkEndpoint linkEndpoint)
-    {
-        MessageInstanceConsumer consumer = linkEndpoint.getConsumer();
-        if(consumer instanceof Consumer<?,?>)
-        {
-            Consumer<?,ConsumerTarget_1_0> modelConsumer = 
(Consumer<?,ConsumerTarget_1_0>) consumer;
-            _consumers.add(modelConsumer);
-        }
-    }
-
-
     private MessageSource createDynamicSource(Map properties)
     {
         final String queueName = _primaryDomain + "TempQueue" + 
UUID.randomUUID().toString();
@@ -1496,11 +1119,17 @@ public class Session_1_0 extends Abstrac
 
     void remoteEnd(End end)
     {
-
-        for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints())
+        List<LinkEndpoint<?>> linkEndpoints = new 
ArrayList<>(_endpointToOutputHandle.keySet());
+        for(LinkEndpoint linkEndpoint : linkEndpoints)
         {
             linkEndpoint.remoteDetached(new Detach());
+            linkEndpoint.dissociateSession();
         }
+        for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
+        {
+            linkEndpoint.dissociateSession();
+        }
+        _associatedLinkEndpoints.clear();
 
         _connection.sessionEnded(this);
         performCloseTasks();
@@ -1586,10 +1215,13 @@ public class Session_1_0 extends Abstrac
     @Override
     public void transportStateChanged()
     {
-        for(SendingLinkEndpoint endpoint : _sendingLinkMap.values())
+        for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
         {
-            ConsumerTarget_1_0 target = endpoint.getConsumerTarget();
-            target.flowStateChanged();
+            if (linkEndpoint instanceof SendingLinkEndpoint)
+            {
+                ConsumerTarget_1_0 target = ((SendingLinkEndpoint) 
linkEndpoint).getConsumerTarget();
+                target.flowStateChanged();
+            }
         }
 
         if (!_consumersWithPendingWork.isEmpty() && 
!getAMQPConnection().isTransportBlockedForWriting())
@@ -1619,14 +1251,14 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
 
-            for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            for (LinkEndpoint<?> linkEndpoint : 
_endpointToOutputHandle.keySet())
             {
-                if (isQueueDestinationForLink(queue, 
endpoint.getReceivingDestination()))
+                if (linkEndpoint instanceof ReceivingLinkEndpoint
+                    && isQueueDestinationForLink(queue, 
((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
                 {
-                    endpoint.setStopped(true);
+                    linkEndpoint.setStopped(true);
                 }
             }
-
         }
     }
 
@@ -1659,11 +1291,12 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            for (LinkEndpoint<?> linkEndpoint : 
_endpointToOutputHandle.keySet())
             {
-                if (isQueueDestinationForLink(queue, 
endpoint.getReceivingDestination()))
+                if (linkEndpoint instanceof ReceivingLinkEndpoint
+                        && isQueueDestinationForLink(queue, 
((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
                 {
-                    endpoint.setStopped(false);
+                    linkEndpoint.setStopped(false);
                 }
             }
         }
@@ -1689,15 +1322,16 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues 
**"));
 
-            for(LinkEndpoint endpoint : _receivingLinkMap.values())
+            for (LinkEndpoint<?> linkEndpoint : 
_endpointToOutputHandle.keySet())
             {
-                endpoint.setStopped(true);
+                if (linkEndpoint instanceof ReceivingLinkEndpoint)
+                {
+                    linkEndpoint.setStopped(true);
+                }
             }
         }
     }
 
-
-
     @Override
     public void unblock()
     {
@@ -1720,11 +1354,12 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            for (LinkEndpoint<?> linkEndpoint : 
_endpointToOutputHandle.keySet())
             {
-                
if(!_blockingEntities.contains(endpoint.getReceivingDestination()))
+                if (linkEndpoint instanceof ReceivingLinkEndpoint
+                    && !_blockingEntities.contains(((ReceivingLinkEndpoint) 
linkEndpoint).getReceivingDestination()))
                 {
-                    endpoint.setStopped(false);
+                    linkEndpoint.setStopped(false);
                 }
             }
         }
@@ -1774,7 +1409,8 @@ public class Session_1_0 extends Abstrac
     @Override
     public long getConsumerCount()
     {
-        return getConsumers().size();
+        // TODO - fix statistic - need to count consumers
+        return -1;
     }
 
     @Override
@@ -1894,19 +1530,11 @@ public class Session_1_0 extends Abstrac
 
     private void detach(UnsignedInteger handle, Detach detach)
     {
-        if(_remoteLinkEndpoints.containsKey(handle))
+        if(_inputHandleToEndpoint.containsKey(handle))
         {
-            LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
-
+            LinkEndpoint<?> endpoint = _inputHandleToEndpoint.remove(handle);
             endpoint.remoteDetached(detach);
-
-            _localLinkEndpoints.remove(endpoint);
-
-            if (Boolean.TRUE.equals(detach.getClosed()))
-            {
-                Map<String, ? extends LinkEndpoint> linkMap = 
endpoint.getRole() == Role.SENDER ? _sendingLinkMap : _receivingLinkMap;
-                linkMap.remove(endpoint.getName());
-            }
+            _endpointToOutputHandle.remove(endpoint);
         }
         else
         {
@@ -1916,7 +1544,7 @@ public class Session_1_0 extends Abstrac
 
     private void detachLinks()
     {
-        Collection<UnsignedInteger> handles = new 
ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
+        Collection<UnsignedInteger> handles = new 
ArrayList<>(_inputHandleToEndpoint.keySet());
         for(UnsignedInteger handle : handles)
         {
             Detach detach = new Detach();
@@ -1926,56 +1554,19 @@ public class Session_1_0 extends Abstrac
             detach(handle, detach);
         }
 
-        final LinkRegistry linkRegistry = 
getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-
-        for(LinkEndpoint<?> linkEndpoint : _sendingLinkMap.values())
+        for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
         {
-            final SendingLink_1_0 link = (SendingLink_1_0) 
linkRegistry.getDurableSendingLink(linkEndpoint.getName());
-
-            if (link != null)
-            {
-                synchronized (link)
-                {
-                    if (link.getEndpoint() == linkEndpoint)
-                    {
-                        try
-                        {
-                            link.setLinkAttachment(null, (SendingLinkEndpoint) 
linkEndpoint);
-                        }
-                        catch (AmqpErrorException e)
-                        {
-                            throw new ConnectionScopedRuntimeException(e);
-                        }
-                    }
-                }
-            }
-        }
-
-        for(LinkEndpoint<?> linkEndpoint : _receivingLinkMap.values())
-        {
-            final StandardReceivingLink_1_0
-                    link = (StandardReceivingLink_1_0) 
linkRegistry.getDurableReceivingLink(linkEndpoint.getName());
-
-            if (link != null)
-            {
-                synchronized (link)
-                {
-                    if (link.getEndpoint() == linkEndpoint)
-                    {
-                        link.setLinkAttachment((ReceivingLinkEndpoint) 
linkEndpoint);
-                    }
-                }
-            }
+            linkEndpoint.dissociateSession();
         }
     }
 
 
-    private UnsignedInteger findNextAvailableHandle()
+    private UnsignedInteger findNextAvailableOutputHandle()
     {
         int i = 0;
         do
         {
-            if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
+            
if(!_endpointToOutputHandle.containsValue(UnsignedInteger.valueOf(i)))
             {
                 return UnsignedInteger.valueOf(i);
             }
@@ -2018,6 +1609,85 @@ public class Session_1_0 extends Abstrac
         return primaryDomain;
     }
 
+    private class EndpointCreationCallback<T extends LinkEndpoint<?>> 
implements FutureCallback<T>
+    {
+
+        private final Attach _attach;
+
+        EndpointCreationCallback(final Attach attach)
+        {
+            _attach = attach;
+        }
+
+        @Override
+        public void onSuccess(final T endpoint)
+        {
+            doOnIOThreadAsync(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    _associatedLinkEndpoints.add(endpoint);
+                    endpoint.setLocalHandle(findNextAvailableOutputHandle());
+                    if (attachWasUnsuccessful(endpoint))
+                    {
+                        endpoint.attach();
+
+                        Error error = new Error();
+                        error.setCondition(AmqpError.NOT_FOUND);
+                        endpoint.close(error);
+                    }
+                    else
+                    {
+                        if (endpoint.getRole() == Role.RECEIVER
+                            && (_blockingEntities.contains(Session_1_0.this)
+                                || (endpoint instanceof 
StandardReceivingLinkEndpoint
+                                    && 
_blockingEntities.contains(((ReceivingLinkEndpoint) 
endpoint).getReceivingDestination()))))
+                        {
+                            endpoint.setStopped(true);
+                        }
+                        _inputHandleToEndpoint.put(_attach.getHandle(), 
endpoint);
+                        if (!_endpointToOutputHandle.containsKey(endpoint))
+                        {
+                            _endpointToOutputHandle.put(endpoint, 
endpoint.getLocalHandle());
+                            endpoint.attach();
+                            endpoint.start();
+                        }
+                        else
+                        {
+                            // TODO - link stealing???
+                        }
+
+                    }
+                }
+            });
+        }
+
+        @Override
+        public void onFailure(final Throwable t)
+        {
+            String errorMessage = String.format("Failed to create LinkEndpoint 
in response to Attach: %s", _attach);
+            _logger.error(errorMessage, t);
+            throw new ConnectionScopedRuntimeException(errorMessage, t);
+        }
+
+        private boolean attachWasUnsuccessful(final T endpoint)
+        {
+            if (endpoint.getRole().equals(Role.SENDER))
+            {
+                return endpoint.getSource() == null;
+            }
+            else if (endpoint.getRole().equals(Role.RECEIVER))
+            {
+                return endpoint.getTarget() == null;
+            }
+            else
+            {
+                throw new IllegalStateException(String.format("Unknown 
LinkEndpoint role '%s'", endpoint.getRole()));
+            }
+        }
+    }
+
     private final class BindingInfo
     {
         private final Map<Symbol, Filter> _actualFilters = new HashMap<>();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 Mon Mar  6 15:06:57 2017
@@ -36,7 +36,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -73,17 +73,14 @@ public class StandardReceivingLinkEndpoi
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
 
-    private final SectionDecoderImpl _sectionDecoder;
     private ArrayList<Transfer> _incompleteMessage;
     private boolean _resumedMessage;
     private Binary _messageDeliveryTag;
     private Map<Binary, Outcome> _unsettledMap = 
Collections.synchronizedMap(new HashMap<Binary, Outcome>());
 
-    public StandardReceivingLinkEndpoint(final Session_1_0 session,
-                                         final Attach attach)
+    public StandardReceivingLinkEndpoint(final StandardReceivingLink_1_0 link, 
final SectionDecoder sectionDecoder)
     {
-        super(session, attach);
-        _sectionDecoder = new 
SectionDecoderImpl(getSession().getConnection().getSectionDecoderRegistry());
+        super(link, sectionDecoder);
     }
 
     @Override
@@ -333,9 +330,8 @@ public class StandardReceivingLinkEndpoi
         }
         else if(detach == null || detach.getError() != null)
         {
-            getLink().setLinkAttachmentToNull();
-            // TODO do we have to set an error?
             detach();
+            dissociateSession();
         }
         else
         {
@@ -360,7 +356,7 @@ public class StandardReceivingLinkEndpoi
         List<EncodingRetainingSection<?>> sections;
         try
         {
-            sections = _sectionDecoder.parseAll(fragments);
+            sections = getSectionDecoder().parseAll(fragments);
         }
         catch (AmqpErrorException e)
         {
@@ -455,10 +451,13 @@ public class StandardReceivingLinkEndpoi
                                        contentSize);
     }
 
-    public void doLinkAttachment()
+    @Override
+    public void attachReceived(final Attach attach) throws AmqpErrorException
     {
-        Map initialUnsettledMap = getInitialUnsettledMap();
+        super.attachReceived(attach);
+        setDeliveryCount(attach.getInitialDeliveryCount());
 
+        Map initialUnsettledMap = getInitialUnsettledMap();
         Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, 
Outcome>(_unsettledMap);
         for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
         {

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
 Mon Mar  6 15:06:57 2017
@@ -20,30 +20,194 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
+import java.util.Arrays;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+
+public class StandardReceivingLink_1_0 extends 
AbstractLink<StandardReceivingLinkEndpoint> implements ReceivingLink_1_0
 {
-    private volatile ReceivingLinkEndpoint _linkEndpoint;
+    public StandardReceivingLink_1_0(final String linkName)
+    {
+        super(linkName);
+    }
 
-    public StandardReceivingLink_1_0(final ReceivingLinkEndpoint endpoint)
+    @Override
+    protected ListenableFuture<StandardReceivingLinkEndpoint> stealLink(final 
Session_1_0 session, final Attach attach)
     {
-        _linkEndpoint = endpoint;
+        throw new UnsupportedOperationException("Link stealing not implemented 
yet");
+        /*
+        final SettableFuture<StandardReceivingLinkEndpoint> returnFuture = 
SettableFuture.create();
+        _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                _linkEndpoint.close(new Error(LinkError.STOLEN,
+                                              String.format("Link is being 
stolen by connection '%s'",
+                                                            
session.getConnection())));
+                try
+                {
+                    returnFuture.set(attach(session, attach).get());
+                }
+                catch (InterruptedException e)
+                {
+                    returnFuture.setException(e);
+                    Thread.currentThread().interrupt();
+                }
+                catch (ExecutionException e)
+                {
+                    returnFuture.setException(e.getCause());
+                }
+            }
+        });
+        return returnFuture;
+        */
     }
 
-    public ReceivingLinkEndpoint getEndpoint()
+    @Override
+    protected ListenableFuture<StandardReceivingLinkEndpoint> 
reattachLink(final Session_1_0 session,
+                                                                           
final Attach attach)
     {
-        return _linkEndpoint;
+        if (_linkEndpoint == null)
+        {
+            SectionDecoderImpl sectionDecoder = new 
SectionDecoderImpl(session.getConnection()
+                                                                              
.getDescribedTypeRegistry()
+                                                                              
.getSectionDecoderRegistry());
+            _linkEndpoint = new StandardReceivingLinkEndpoint(this, 
sectionDecoder);
+        }
+
+        _target = new Target();
+        _source = attach.getSource();
+
+        try
+        {
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+
+            Target attachTarget = (Target) attach.getTarget();
+            final ReceivingDestination destination = 
session.getReceivingDestination(attachTarget);
+            ((Target) _target).setAddress(attachTarget.getAddress());
+            ((Target) _target).setDynamic(attachTarget.getDynamic());
+            ((Target) _target).setCapabilities(destination.getCapabilities());
+            
_linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities()));
+            _linkEndpoint.setDestination(destination);
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+
+        return Futures.immediateFuture(_linkEndpoint);    }
+
+    @Override
+    protected ListenableFuture<StandardReceivingLinkEndpoint> resumeLink(final 
Session_1_0 session, final Attach attach)
+    {
+        if (getTarget() == null)
+        {
+            throw new IllegalStateException("Terminus should be set when 
resuming a Link.");
+        }
+        if (attach.getTarget() == null)
+        {
+            throw new IllegalStateException("Attach.getTarget should not be 
null when resuming a Link. That would be recovering the Link.");
+        }
+
+        _source = attach.getSource();
+
+        try
+        {
+
+            if (_linkEndpoint == null)
+            {
+                SectionDecoderImpl sectionDecoder = new 
SectionDecoderImpl(session.getConnection()
+                                                                               
   .getDescribedTypeRegistry()
+                                                                               
   .getSectionDecoderRegistry());
+                _linkEndpoint = new StandardReceivingLinkEndpoint(this, 
sectionDecoder);
+
+                final ReceivingDestination destination = 
session.getReceivingDestination((Target) _target);
+                _linkEndpoint.setDestination(destination);
+                ((Target) 
_target).setCapabilities(destination.getCapabilities());
+                
_linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities()));
+                _linkEndpoint.setDestination(destination);
+            }
+
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+
+            
_linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+
+        return Futures.immediateFuture(_linkEndpoint);
     }
 
-    public void setLinkAttachment(final ReceivingLinkEndpoint linkEndpoint)
+    @Override
+    protected ListenableFuture<StandardReceivingLinkEndpoint> 
recoverLink(final Session_1_0 session,
+                                                                          
final Attach attach)
     {
-        _linkEndpoint = linkEndpoint;
-        ((StandardReceivingLinkEndpoint)getEndpoint()).doLinkAttachment();
+        if (_target == null)
+        {
+            return rejectLink(session, attach);
+        }
+
+        _source = attach.getSource();
+
+        try
+        {
+            if (_linkEndpoint == null)
+            {
+                SectionDecoderImpl sectionDecoder = new 
SectionDecoderImpl(session.getConnection()
+                                                                               
   .getDescribedTypeRegistry()
+                                                                               
   .getSectionDecoderRegistry());
+                _linkEndpoint = new StandardReceivingLinkEndpoint(this, 
sectionDecoder);
+
+                final ReceivingDestination destination = 
session.getReceivingDestination((Target) _target);
+                ((Target) 
_target).setCapabilities(destination.getCapabilities());
+                
_linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities()));
+                _linkEndpoint.setDestination(destination);
+            }
+
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+
+            
_linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+
+        return Futures.immediateFuture(_linkEndpoint);
     }
 
     @Override
-    public void setLinkAttachmentToNull()
+    protected ListenableFuture<StandardReceivingLinkEndpoint> 
establishLink(final Session_1_0 session,
+                                                                            
final Attach attach)
     {
-        _linkEndpoint = null;
+        if (_linkEndpoint != null || getTarget() != null)
+        {
+            throw new IllegalStateException("LinkEndpoint and Target should be 
null when establishing a Link.");
+        }
+
+        return reattachLink(session, attach);
     }
 
+    private ListenableFuture<StandardReceivingLinkEndpoint> rejectLink(final 
Session_1_0 session,
+                                                                       final 
Attach attach)
+    {
+        SectionDecoderImpl sectionDecoder =
+                new 
SectionDecoderImpl(session.getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
+        _linkEndpoint = new StandardReceivingLinkEndpoint(this, 
sectionDecoder);
+        _linkEndpoint.associateSession(session);
+        _target = null;
+        return Futures.immediateFuture(_linkEndpoint);
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 Mon Mar  6 15:06:57 2017
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -48,16 +48,13 @@ import org.apache.qpid.server.util.Conne
 
 public class TxnCoordinatorReceivingLinkEndpoint extends ReceivingLinkEndpoint
 {
-    private final SectionDecoderImpl _sectionDecoder;
     private final LinkedHashMap<Integer, ServerTransaction> 
_createdTransactions = new LinkedHashMap<>();
     private ArrayList<Transfer> _incompleteMessage;
 
-    public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session,
-                                               final Attach attach)
+    public TxnCoordinatorReceivingLinkEndpoint(final 
TxnCoordinatorReceivingLink_1_0 link,
+                                               final SectionDecoder 
sectionDecoder)
     {
-        super(session, attach);
-        _sectionDecoder = new 
SectionDecoderImpl(getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
-
+        super(link, sectionDecoder);
     }
 
     @Override
@@ -109,7 +106,7 @@ public class TxnCoordinatorReceivingLink
         // Only interested in the amqp-value section that holds the message to 
the coordinator
         try
         {
-            List<EncodingRetainingSection<?>> sections = 
_sectionDecoder.parseAll(payload);
+            List<EncodingRetainingSection<?>> sections = 
getSectionDecoder().parseAll(payload);
             boolean amqpValueSectionFound = false;
             for(EncodingRetainingSection section : sections)
             {
@@ -225,5 +222,10 @@ public class TxnCoordinatorReceivingLink
 
     }
 
-
+    @Override
+    public void attachReceived(final Attach attach) throws AmqpErrorException
+    {
+        super.attachReceived(attach);
+        setDeliveryCount(attach.getInitialDeliveryCount());
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
 Mon Mar  6 15:06:57 2017
@@ -20,20 +20,92 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 
-public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
+public class TxnCoordinatorReceivingLink_1_0 extends 
AbstractLink<TxnCoordinatorReceivingLinkEndpoint> implements ReceivingLink_1_0
 {
-    public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint)
+    public TxnCoordinatorReceivingLink_1_0(final String linkName)
     {
-        
((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, 
TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+        super(linkName);
     }
 
+    @Override
+    protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
stealLink(final Session_1_0 session,
+                                                                              
final Attach attach)
+    {
+        return rejectLink(session, attach);
+    }
 
     @Override
-    public void setLinkAttachmentToNull()
+    protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
reattachLink(final Session_1_0 session,
+                                                                               
  final Attach attach)
     {
+        return rejectLink(session, attach);
     }
 
+    @Override
+    protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
resumeLink(final Session_1_0 session,
+                                                                               
final Attach attach)
+    {
+        return rejectLink(session, attach);
+    }
+
+    @Override
+    protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
recoverLink(final Session_1_0 session,
+                                                                               
 final Attach attach)
+    {
+        return rejectLink(session, attach);
+    }
+
+    @Override
+    protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
establishLink(final Session_1_0 session,
+                                                                               
   final Attach attach)
+    {
+        if (_linkEndpoint != null || getTarget() != null)
+        {
+            throw new IllegalStateException("LinkEndpoint and Target should be 
null when establishing a Link.");
+        }
+
+        _target = new Coordinator();
+        ((Coordinator) _target).setCapabilities(TxnCapability.LOCAL_TXN,
+                                                
TxnCapability.MULTI_SSNS_PER_TXN,
+                                                
TxnCapability.MULTI_TXNS_PER_SSN);
+        _source = attach.getSource();
+
+        try
+        {
+            SectionDecoder sectionDecoder = new 
SectionDecoderImpl(session.getConnection()
+                                                                          
.getDescribedTypeRegistry()
+                                                                          
.getSectionDecoderRegistry());
+            _linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(this, 
sectionDecoder);
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+
+        return Futures.immediateFuture(_linkEndpoint);
+    }
+
+    private ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> 
rejectLink(final Session_1_0 session,
+                                                                             
final Attach attach)
+    {
+        SectionDecoder sectionDecoder = new 
SectionDecoderImpl(session.getConnection()
+                                                                      
.getDescribedTypeRegistry()
+                                                                      
.getSectionDecoderRegistry());
+        _linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(this, 
sectionDecoder);
+        _linkEndpoint.associateSession(session);
+        _target = null;
+        return Futures.immediateFuture(_linkEndpoint);
+    }
 }

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java?rev=1785660&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
 Mon Mar  6 15:06:57 2017
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class LinkRegistryTest extends QpidTestCase
+{
+    private QueueManagingVirtualHost _virtualHost;
+    private LinkRegistryImpl _linkRegistry;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _virtualHost = mock(QueueManagingVirtualHost.class);
+        _linkRegistry = new LinkRegistryImpl(_virtualHost);
+    }
+
+    private void doTestGetLink(final Class<? extends LinkModel> type) throws 
Exception
+    {
+        String remoteContainerId = "testRemoteContainerId";
+        String linkName = "testLinkName";
+        LinkModel link = _linkRegistry.getLink(remoteContainerId, linkName, 
type);
+        assertNotNull("LinkRegistry#getLink should always return an object", 
link);
+        LinkModel link2 = _linkRegistry.getLink(remoteContainerId, linkName, 
type);
+        assertNotNull("LinkRegistry#getLink should always return an object", 
link2);
+        assertSame("Two calls to LinkRegistry#getLink should return the same 
object", link, link2);
+    }
+
+    public void testGetSendingLink() throws Exception
+    {
+        doTestGetLink(SendingLink_1_0.class);
+    }
+
+    public void testGetReceivingLink() throws Exception
+    {
+        doTestGetLink(StandardReceivingLink_1_0.class);
+    }
+
+    public void testGetCoordinatingLink() throws Exception
+    {
+        doTestGetLink(TxnCoordinatorReceivingLink_1_0.class);
+    }
+}

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
 Mon Mar  6 15:06:57 2017
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -38,11 +40,13 @@ import javax.security.auth.Subject;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.server.common.AMQPFilterTypes;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Binding;
@@ -56,6 +60,8 @@ import org.apache.qpid.server.model.Publ
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -84,16 +90,26 @@ public class Session_1_0Test extends Qpi
     private VirtualHost<?> _virtualHost;
     private Session_1_0 _session;
     private int _handle;
+    private CurrentThreadTaskExecutor _taskExecutor;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
         _virtualHost = BrokerTestHelper.createVirtualHost("testVH");
+        _taskExecutor = new CurrentThreadTaskExecutor();
+        _taskExecutor.start();
         _connection = createAmqpConnection_1_0("testContainerId");
         this._session = createSession_1_0(_connection, 0);
     }
 
+    @Override
+    protected void tearDown() throws Exception
+    {
+        _taskExecutor.stop();
+        super.tearDown();
+    }
+
     public void testReceiveAttachTopicNonDurableNoContainer() throws Exception
     {
         final String linkName = "testLink";
@@ -308,7 +324,7 @@ public class Session_1_0Test extends Qpi
         assertNotNull("Unexpected source", sentAttach.getSource());
         Source source = (Source)sentAttach.getSource();
         assertEquals("Unexpected address", address, source.getAddress());
-        assertEquals("Unexpected capabilities", 
((Source)attach.getSource()).getCapabilities(), source.getCapabilities());
+        assertTrue("Unexpected source capabilities", 
Arrays.asList(source.getCapabilities()).contains(Symbol.valueOf("topic")));
 
         Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
         assertEquals("Unexpected number of queues after unsubscribe", 1, 
queues.size());
@@ -497,8 +513,22 @@ public class Session_1_0Test extends Qpi
 
         assertEquals("Unexpected name", receivedAttach.getName(), 
sentAttach.getName());
         assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
-        assertEquals("Unexpected source", receivedAttach.getSource(), 
sentAttach.getSource());
-        assertEquals("Unexpected target", receivedAttach.getTarget(), 
sentAttach.getTarget());
+
+        Source receivedSource = (Source) receivedAttach.getSource();
+        Source sentSource = (Source) sentAttach.getSource();
+        assertEquals("Unexpected source address", receivedSource.getAddress(), 
sentSource.getAddress());
+        assertArrayEquals("Unexpected source capabilities", 
receivedSource.getCapabilities(), sentSource.getCapabilities());
+        assertEquals("Unexpected source durability", 
receivedSource.getDurable(), sentSource.getDurable());
+        assertEquals("Unexpected source expiry policy", 
receivedSource.getExpiryPolicy(), sentSource.getExpiryPolicy());
+        assertEquals("Unexpected source dynamic flag", 
receivedSource.getDynamic(), sentSource.getDynamic());
+
+        Target receivedTarget = (Target) receivedAttach.getTarget();
+        Target sentTarget = (Target) sentAttach.getTarget();
+        assertEquals("Unexpected target address", receivedTarget.getAddress(), 
sentTarget.getAddress());
+        assertArrayEquals("Unexpected target capabilities", 
receivedTarget.getCapabilities(), sentTarget.getCapabilities());
+        assertEquals("Unexpected target durability", 
receivedTarget.getDurable(), sentTarget.getDurable());
+        assertEquals("Unexpected target expiry policy", 
receivedTarget.getExpiryPolicy(), sentTarget.getExpiryPolicy());
+        assertEquals("Unexpected target dynamic flag", 
receivedTarget.getDynamic(), sentTarget.getDynamic());
 
         final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
         assertEquals("Unexpected number of queues after attach", 1, 
queues.size());
@@ -651,7 +681,8 @@ public class Session_1_0Test extends Qpi
         when(connection.getAddressSpace()).thenReturn(_virtualHost);
         when(connection.getEventLogger()).thenReturn(mock(EventLogger.class));
         when(connection.getContextValue(Long.class, 
Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L);
-        
when(connection.getChildExecutor()).thenReturn(mock(TaskExecutor.class));
+        when(connection.getChildExecutor()).thenReturn(_taskExecutor);
+        when(connection.getTaskExecutor()).thenReturn(_taskExecutor);
         when(connection.getModel()).thenReturn(BrokerModel.getInstance());
         when(connection.getContextValue(Long.class, 
Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(connection.getContextValue(Integer.class, 
Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);

Modified: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
 Mon Mar  6 15:06:57 2017
@@ -26,6 +26,7 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,9 +53,10 @@ import org.apache.qpid.server.model.Conn
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
 import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.session.AMQPSession;
@@ -66,6 +68,7 @@ import org.apache.qpid.server.txn.DtxNot
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
 import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
 
 public class ManagementAddressSpace implements NamedAddressSpace
@@ -83,11 +86,11 @@ public class ManagementAddressSpace impl
     private final MessageStore _messageStore;
     private final MessageDestination _defaultDestination = new 
DefaultDestination();
     private final List<AMQPConnection<?>> _connections = new 
CopyOnWriteArrayList<>();
-    private final LinkRegistry _linkRegistry = new NonDurableLinkRegistry();
     private final Broker<?> _broker;
     private final Principal _principal;
     private final UUID _id;
     private final ConcurrentMap<Object, ConcurrentMap<String, 
ProxyMessageSource>> _connectionSpecificDestinations = new 
ConcurrentHashMap<>();
+    private final LinkRegistry _linkRegistry;
 
     public ManagementAddressSpace(final 
SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
     {
@@ -105,6 +108,22 @@ public class ManagementAddressSpace impl
         _messageStore = new MemoryMessageStore();
         _principal = new ManagementAddressSpacePrincipal(this);
         _id = 
UUID.nameUUIDFromBytes((_broker.getId().toString()+"/"+name).getBytes(StandardCharsets.UTF_8));
+
+        Iterator<LinkRegistryFactory>
+                linkRegistryFactories = (new 
QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
+        if (linkRegistryFactories.hasNext())
+        {
+            final LinkRegistryFactory linkRegistryFactory = 
linkRegistryFactories.next();
+            if (linkRegistryFactories.hasNext())
+            {
+                throw new RuntimeException("Found multiple implementations of 
LinkRegistry");
+            }
+            _linkRegistry = linkRegistryFactory.create(this);
+        }
+        else
+        {
+            _linkRegistry = null;
+        }
     }
 
 
@@ -209,9 +228,9 @@ public class ManagementAddressSpace impl
     }
 
     @Override
-    public LinkRegistry getLinkRegistry(final String remoteContainerId)
+    public <T extends LinkModel> T getLink(final String remoteContainerId, 
final String linkName, final Class<T> type)
     {
-        return _linkRegistry;
+        return _linkRegistry.getLink(remoteContainerId, linkName, type);
     }
 
     @Override
@@ -386,37 +405,4 @@ public class ManagementAddressSpace impl
 
         }
     }
-
-    private class NonDurableLinkRegistry implements LinkRegistry
-    {
-        @Override
-        public LinkModel getDurableSendingLink(final String name)
-        {
-            return null;
-        }
-
-        @Override
-        public boolean registerSendingLink(final String name, final LinkModel 
link)
-        {
-            throw new ConnectionScopedRuntimeException("Durable links are not 
supported");
-        }
-
-        @Override
-        public boolean unregisterSendingLink(final String name)
-        {
-            return false;
-        }
-
-        @Override
-        public LinkModel getDurableReceivingLink(final String name)
-        {
-            return null;
-        }
-
-        @Override
-        public boolean registerReceivingLink(final String name, final 
LinkModel link)
-        {
-            throw new ConnectionScopedRuntimeException("Durable links are not 
supported");
-        }
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to