Author: lquack
Date: Fri Mar 31 13:38:45 2017
New Revision: 1789687

URL: http://svn.apache.org/viewvc?rev=1789687&view=rev
Log:
QPID-7658: [Java Broker] LinkRegistry: Address review comment: Make Link and 
LinkEndpoint generic

Modified:
    
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
 Fri Mar 31 13:38:45 2017
@@ -45,6 +45,8 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.LinkKey;
 import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer;
@@ -64,7 +66,7 @@ public class BDBLinkStore extends Abstra
     }
 
     @Override
-    protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    protected Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final 
LinkStoreUpdater updater) throws StoreException
     {
         try
         {
@@ -77,7 +79,7 @@ public class BDBLinkStore extends Abstra
     }
 
     @Override
-    protected void doSaveLink(final LinkDefinition link)
+    protected void doSaveLink(final LinkDefinition<Source, Target> link)
     {
         try
         {
@@ -91,7 +93,7 @@ public class BDBLinkStore extends Abstra
     }
 
     @Override
-    protected void doDeleteLink(final LinkDefinition linkDefinition)
+    protected void doDeleteLink(final LinkDefinition<Source, Target> 
linkDefinition)
     {
         LinkKey linkKey = new LinkKey(linkDefinition);
         try
@@ -145,10 +147,10 @@ public class BDBLinkStore extends Abstra
     }
 
 
-    private Collection<LinkDefinition> getLinkDefinitions(final 
LinkStoreUpdater updater)
+    private Collection<LinkDefinition<Source, Target>> 
getLinkDefinitions(final LinkStoreUpdater updater)
     {
         Database linksDatabase = 
getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
-        Collection<LinkDefinition> links = new HashSet<>();
+        Collection<LinkDefinition<Source, Target>> links = new HashSet<>();
 
         ModelVersion currentVersion =
                 new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, 
BrokerModel.MODEL_MINOR_VERSION);
@@ -168,7 +170,7 @@ public class BDBLinkStore extends Abstra
             {
                 LinkKey linkKey = keyEntryBinding.entryToObject(key);
                 LinkValue linkValue = 
linkValueEntryBinding.entryToObject(value);
-                LinkDefinition link = new 
LinkDefinitionImpl(linkKey.getRemoteContainerId(), linkKey.getLinkName(), 
linkKey.getRole(), linkValue.getSource(), linkValue.getTarget());
+                LinkDefinition<Source, Target> link = new 
LinkDefinitionImpl<>(linkKey.getRemoteContainerId(), linkKey.getLinkName(), 
linkKey.getRole(), linkValue.getSource(), linkValue.getTarget());
                 links.add(link);
             }
         }
@@ -180,7 +182,7 @@ public class BDBLinkStore extends Abstra
             try
             {
                 linksDatabase = getEnvironmentFacade().clearDatabase(txn, 
LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
-                for (LinkDefinition link : links)
+                for (LinkDefinition<Source, Target> link : links)
                 {
                     save(linksDatabase, txn, link);
                 }
@@ -208,7 +210,7 @@ public class BDBLinkStore extends Abstra
         linksVersionDb.put(txn, key, value);
     }
 
-    private void save(Database database, Transaction txn, final LinkDefinition 
link)
+    private void save(Database database, Transaction txn, final 
LinkDefinition<Source, Target> link)
     {
         DatabaseEntry key = new DatabaseEntry();
         DatabaseEntry value = new DatabaseEntry();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
 Fri Mar 31 13:38:45 2017
@@ -20,34 +20,34 @@
 package org.apache.qpid.server.protocol.v1_0.store.bdb;
 
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 
 public class LinkValue
 {
     static final byte CURRENT_VERSION = 0;
-    private final BaseSource _source;
-    private final BaseTarget _target;
+    private final Source _source;
+    private final Target _target;
     private final byte _version;
 
-    public LinkValue(final BaseSource source, final BaseTarget target, final 
byte version)
+    public LinkValue(final Source source, final Target target, final byte 
version)
     {
         _source = source;
         _target = target;
         _version = version;
     }
 
-    public LinkValue(final LinkDefinition link)
+    public LinkValue(final LinkDefinition<Source, Target> link)
     {
         this(link.getSource(), link.getTarget(), CURRENT_VERSION);
     }
 
-    public BaseSource getSource()
+    public Source getSource()
     {
         return _source;
     }
 
-    public BaseTarget getTarget()
+    public Target getTarget()
     {
         return _target;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
 Fri Mar 31 13:38:45 2017
@@ -52,6 +52,8 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -81,9 +83,9 @@ public class JDBCLinkStore extends Abstr
     }
 
     @Override
-    protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    protected Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final 
LinkStoreUpdater updater) throws StoreException
     {
-        Collection<LinkDefinition> linkDefinitions;
+        Collection<LinkDefinition<Source, Target>> linkDefinitions;
         try
         {
             checkTransactionIsolationLevel();
@@ -117,7 +119,7 @@ public class JDBCLinkStore extends Abstr
     }
 
     @Override
-    protected void doSaveLink(final LinkDefinition link) throws StoreException
+    protected void doSaveLink(final LinkDefinition<Source, Target> link) 
throws StoreException
     {
         String linkKey = generateLinkKey(link);
         Connection connection = getConnection();
@@ -164,7 +166,7 @@ public class JDBCLinkStore extends Abstr
     }
 
     @Override
-    protected void doDeleteLink(final LinkDefinition link) throws 
StoreException
+    protected void doDeleteLink(final LinkDefinition<Source, Target> link) 
throws StoreException
     {
 
         try (Connection connection = getConnection();
@@ -276,10 +278,11 @@ public class JDBCLinkStore extends Abstr
         return _tableNamePrefix + VERSION_TABLE_NAME_SUFFIX;
     }
 
-    private Collection<LinkDefinition> performUpdate(final LinkStoreUpdater 
updater,
-                                                     
Collection<LinkDefinition> linkDefinitions,
-                                                     final ModelVersion 
storedVersion,
-                                                     final ModelVersion 
currentVersion) throws SQLException
+    private Collection<LinkDefinition<Source, Target>> performUpdate(final 
LinkStoreUpdater updater,
+                                                                     
Collection<LinkDefinition<Source, Target>> linkDefinitions,
+                                                                     final 
ModelVersion storedVersion,
+                                                                     final 
ModelVersion currentVersion)
+            throws SQLException
     {
         linkDefinitions = updater.update(storedVersion.toString(), 
linkDefinitions);
         Connection connection = getConnection();
@@ -292,7 +295,7 @@ public class JDBCLinkStore extends Abstr
                 statement.execute("DELETE FROM " + getLinksTableName());
             }
 
-            for (LinkDefinition linkDefinition : linkDefinitions)
+            for (LinkDefinition<? extends BaseSource, ? extends BaseTarget> 
linkDefinition : linkDefinitions)
             {
                 insert(connection, generateLinkKey(linkDefinition), 
linkDefinition);
             }
@@ -318,9 +321,9 @@ public class JDBCLinkStore extends Abstr
         return linkDefinitions;
     }
 
-    private Collection<LinkDefinition> getLinks() throws SQLException
+    private Collection<LinkDefinition<Source, Target>> getLinks() throws 
SQLException
     {
-        Collection<LinkDefinition> links = new ArrayList<>();
+        Collection<LinkDefinition<Source, Target>> links = new ArrayList<>();
         try (Connection connection = getConnection();
              Statement statement = connection.createStatement();
              ResultSet resultSet = statement.executeQuery(String.format(
@@ -335,7 +338,7 @@ public class JDBCLinkStore extends Abstr
                 Source source = (Source) getBlobAsAmqpObject(resultSet, 4);
                 Target target = (Target) getBlobAsAmqpObject(resultSet, 5);
 
-                links.add(new LinkDefinitionImpl(remoteContainerId, linkName, 
role, source, target));
+                links.add(new LinkDefinitionImpl<>(remoteContainerId, 
linkName, role, source, target));
             }
         }
         return links;
@@ -431,7 +434,9 @@ public class JDBCLinkStore extends Abstr
     }
 
 
-    private void insert(final Connection connection, final String linkKey, 
final LinkDefinition linkDefinition)
+    private void insert(final Connection connection,
+                        final String linkKey,
+                        final LinkDefinition<? extends BaseSource, ? extends 
BaseTarget> linkDefinition)
             throws SQLException
     {
         try (PreparedStatement statement = 
connection.prepareStatement(String.format(
@@ -451,7 +456,9 @@ public class JDBCLinkStore extends Abstr
         }
     }
 
-    private void update(final Connection connection, final String linkKey, 
final LinkDefinition linkDefinition)
+    private void update(final Connection connection,
+                        final String linkKey,
+                        final LinkDefinition<? extends BaseSource, ? extends 
BaseTarget> linkDefinition)
             throws SQLException
     {
         try (PreparedStatement statement = 
connection.prepareStatement(String.format(
@@ -500,7 +507,7 @@ public class JDBCLinkStore extends Abstr
         saveBytesAsBlob(statement, index, value.getBytes(UTF_8));
     }
 
-    private String generateLinkKey(final LinkDefinition linkDefinition)
+    private String generateLinkKey(final LinkDefinition<?, ?> linkDefinition)
     {
         MessageDigest md;
         try

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -47,10 +47,10 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
-public abstract class AbstractLinkEndpoint implements LinkEndpoint
+public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends 
BaseTarget> implements LinkEndpoint<S, T>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractLinkEndpoint.class);
-    private final Link_1_0 _link;
+    private final Link_1_0<S, T> _link;
     private final Session_1_0 _session;
     private Object _flowTransactionId;
     private SenderSettleMode _sendingSettlementMode;
@@ -82,7 +82,7 @@ public abstract class AbstractLinkEndpoi
     }
 
 
-    AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
+    AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0<S, T> link)
     {
         _session = session;
         _link = link;
@@ -158,13 +158,13 @@ public abstract class AbstractLinkEndpoi
     }
 
     @Override
-    public BaseSource getSource()
+    public S getSource()
     {
         return _link.getSource();
     }
 
     @Override
-    public BaseTarget getTarget()
+    public T getTarget()
     {
         return _link.getTarget();
     }
@@ -468,7 +468,7 @@ public abstract class AbstractLinkEndpoi
         }
     }
 
-    public Link_1_0 getLink()
+    protected Link_1_0<S, T> getLink()
     {
         return _link;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -27,11 +27,13 @@ import java.util.Map;
 
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import 
org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -39,7 +41,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
-public abstract class AbstractReceivingLinkEndpoint extends 
AbstractLinkEndpoint
+public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> 
extends AbstractLinkEndpoint<Source, T>
 {
     private final SectionDecoder _sectionDecoder;
     private UnsignedInteger _lastDeliveryId;
@@ -88,7 +90,7 @@ public abstract class AbstractReceivingL
     }
 
 
-    public AbstractReceivingLinkEndpoint(final Session_1_0 session, final 
Link_1_0 link)
+    public AbstractReceivingLinkEndpoint(final Session_1_0 session, final 
Link_1_0<Source, T> link)
     {
         super(session, link);
         _sectionDecoder = new SectionDecoderImpl(session.getConnection()

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Fri Mar 31 13:38:45 2017
@@ -56,6 +56,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import 
org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -605,9 +606,7 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     public String getTargetAddress()
     {
-        BaseTarget target = _linkEndpoint.getTarget();
-
-        return target instanceof 
org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? 
((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) 
target).getAddress() : _linkEndpoint.getLinkName();
+        return _linkEndpoint.getTarget().getAddress();
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
 Fri Mar 31 13:38:45 2017
@@ -19,6 +19,8 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -27,12 +29,12 @@ public class Delivery
 {
     private final UnsignedInteger _deliveryId;
     private final Binary _deliveryTag;
-    private final LinkEndpoint _linkEndpoint;
+    private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
_linkEndpoint;
     private boolean _complete;
     private boolean _settled;
     private int _numberOfTransfers = 0;
 
-    public Delivery(Transfer transfer, final LinkEndpoint endpoint)
+    public Delivery(Transfer transfer, final LinkEndpoint<? extends 
BaseSource, ? extends BaseTarget> endpoint)
     {
         _settled = Boolean.TRUE.equals(transfer.getSettled());
         _deliveryId = transfer.getDeliveryId();
@@ -79,7 +81,7 @@ public class Delivery
         return _deliveryId;
     }
 
-    public LinkEndpoint getLinkEndpoint()
+    public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
getLinkEndpoint()
     {
         return _linkEndpoint;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -30,14 +30,14 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public class ErrantLinkEndpoint implements LinkEndpoint
+public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> 
implements LinkEndpoint<S, T>
 {
-    private final Link_1_0 _link;
+    private final Link_1_0<S, T> _link;
     private final Session_1_0 _session;
     private Error _error;
     private UnsignedInteger _localHandle;
 
-    ErrantLinkEndpoint(Link_1_0 link, Session_1_0 session, Error error)
+    ErrantLinkEndpoint(Link_1_0<S, T> link, Session_1_0 session, Error error)
     {
         _link = link;
         _session = session;
@@ -51,13 +51,13 @@ public class ErrantLinkEndpoint implemen
     }
 
     @Override
-    public BaseSource getSource()
+    public S getSource()
     {
         return null;
     }
 
     @Override
-    public BaseTarget getTarget()
+    public T getTarget()
     {
         return null;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
 Fri Mar 31 13:38:45 2017
@@ -25,7 +25,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public interface LinkDefinition extends LinkModel
+public interface LinkDefinition<S extends BaseSource, T extends BaseTarget> 
extends LinkModel
 {
     String getRemoteContainerId();
 
@@ -33,7 +33,7 @@ public interface LinkDefinition extends
 
     Role getRole();
 
-    BaseSource getSource();
+    S getSource();
 
-    BaseTarget getTarget();
+    T getTarget();
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
 Fri Mar 31 13:38:45 2017
@@ -24,19 +24,19 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public class LinkDefinitionImpl implements LinkDefinition
+public class LinkDefinitionImpl<S extends BaseSource, T extends BaseTarget> 
implements LinkDefinition<S, T>
 {
     private final String _remoteContainerId;
     private final String _name;
     private final Role _role;
-    private final BaseSource _source;
-    private final BaseTarget _target;
+    private final S _source;
+    private final T _target;
 
     public LinkDefinitionImpl(final String remoteContainerId,
                               final String name,
                               final Role role,
-                              final BaseSource source,
-                              final BaseTarget target)
+                              final S source,
+                              final T target)
     {
         _remoteContainerId = remoteContainerId;
         _name = name;
@@ -64,13 +64,13 @@ public class LinkDefinitionImpl implemen
     }
 
     @Override
-    public BaseSource getSource()
+    public S getSource()
     {
         return _source;
     }
 
     @Override
-    public BaseTarget getTarget()
+    public T getTarget()
     {
         return _target;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -30,13 +30,13 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public interface LinkEndpoint
+public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
 {
     Role getRole();
 
-    BaseSource getSource();
+    S getSource();
 
-    BaseTarget getTarget();
+    T getTarget();
 
     Session_1_0 getSession();
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
 Fri Mar 31 13:38:45 2017
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
@@ -38,9 +39,8 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-public class LinkImpl implements Link_1_0
+public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements 
Link_1_0<S, T>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LinkImpl.class);
 
@@ -49,9 +49,9 @@ public class LinkImpl implements Link_1_
     private final Role _role;
     private final LinkRegistry _linkRegistry;
 
-    private volatile LinkEndpoint _linkEndpoint;
-    private volatile BaseSource _source;
-    private volatile BaseTarget _target;
+    private volatile LinkEndpoint<S, T> _linkEndpoint;
+    private volatile S _source;
+    private volatile T _target;
 
     public LinkImpl(final String remoteContainerId, final String linkName, 
final Role role, final LinkRegistry linkRegistry)
     {
@@ -61,14 +61,14 @@ public class LinkImpl implements Link_1_
         _linkRegistry = linkRegistry;
     }
 
-    public LinkImpl(final LinkDefinition linkDefinition, final LinkRegistry 
linkRegistry)
+    public LinkImpl(final LinkDefinition<S, T> linkDefinition, final 
LinkRegistry linkRegistry)
     {
         this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), 
linkDefinition.getRole(), linkRegistry);
         setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
     }
 
     @Override
-    public final ListenableFuture<? extends LinkEndpoint> attach(final 
Session_1_0 session, final Attach attach)
+    public final ListenableFuture<? extends LinkEndpoint<S, T>> attach(final 
Session_1_0 session, final Attach attach)
     {
         try
         {
@@ -91,7 +91,7 @@ public class LinkImpl implements Link_1_
 
                 _linkEndpoint.receiveAttach(attach);
                 _linkRegistry.linkChanged(this);
-                return Futures.immediateFuture((LinkEndpoint) _linkEndpoint);
+                return Futures.immediateFuture(_linkEndpoint);
             }
         }
         catch (Exception e)
@@ -101,62 +101,59 @@ public class LinkImpl implements Link_1_
         }
     }
 
-    private synchronized ListenableFuture<LinkEndpoint> stealLink(final 
Session_1_0 session, final Attach attach)
+    private synchronized ListenableFuture<LinkEndpoint<S, T>> stealLink(final 
Session_1_0 session, final Attach attach)
     {
-        final SettableFuture<LinkEndpoint> returnFuture = 
SettableFuture.create();
-        _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                _linkEndpoint.close(new Error(LinkError.STOLEN,
-                                              String.format("Link is being 
stolen by connection '%s'",
-                                                            
session.getConnection())));
-                try
-                {
-                    returnFuture.set(attach(session, attach).get());
-                }
-                catch (InterruptedException e)
+        final SettableFuture<LinkEndpoint<S, T>> returnFuture = 
SettableFuture.create();
+        _linkEndpoint.getSession().doOnIOThreadAsync(
+                () ->
                 {
-                    returnFuture.setException(e);
-                    Thread.currentThread().interrupt();
-                }
-                catch (ExecutionException e)
-                {
-                    returnFuture.setException(e.getCause());
-                }
-            }
-        });
+                    _linkEndpoint.close(new Error(LinkError.STOLEN,
+                                                  String.format("Link is being 
stolen by connection '%s'",
+                                                                
session.getConnection())));
+                    try
+                    {
+                        returnFuture.set(attach(session, attach).get());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        returnFuture.setException(e);
+                        Thread.currentThread().interrupt();
+                    }
+                    catch (ExecutionException e)
+                    {
+                        returnFuture.setException(e.getCause());
+                    }
+                });
         return returnFuture;
     }
 
-    private LinkEndpoint createLinkEndpoint(final Session_1_0 session, final 
Attach attach)
+    private LinkEndpoint<S, T> createLinkEndpoint(final Session_1_0 session, 
final Attach attach)
     {
-        LinkEndpoint linkEndpoint = null;
+        final LinkEndpoint<S, T> linkEndpoint;
         if (_role == Role.SENDER)
         {
-            linkEndpoint = new SendingLinkEndpoint(session, this);
+            linkEndpoint = (LinkEndpoint<S, T>) new 
SendingLinkEndpoint(session, (LinkImpl<Source, Target>) this);
         }
         else if (attach.getTarget() instanceof Coordinator)
         {
-            linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(session, 
this);
+            linkEndpoint = (LinkEndpoint<S, T>) new 
TxnCoordinatorReceivingLinkEndpoint(session, (LinkImpl<Source, Coordinator>) 
this);
         }
         else
         {
-            linkEndpoint = new StandardReceivingLinkEndpoint(session, this);
+            linkEndpoint = (LinkEndpoint<S, T>) new 
StandardReceivingLinkEndpoint(session, (LinkImpl<Source, Target>) this);
         }
         return linkEndpoint;
     }
 
-    private ListenableFuture<? extends LinkEndpoint> rejectLink(final 
Session_1_0 session, Throwable t)
+    private ListenableFuture<? extends LinkEndpoint<S, T>> rejectLink(final 
Session_1_0 session, Throwable t)
     {
         if (t instanceof AmqpErrorException)
         {
-            _linkEndpoint = new ErrantLinkEndpoint(this, session, 
((AmqpErrorException) t).getError());
+            _linkEndpoint = new ErrantLinkEndpoint<>(this, session, 
((AmqpErrorException) t).getError());
         }
         else
         {
-            _linkEndpoint = new ErrantLinkEndpoint(this, session, new 
Error(AmqpError.INTERNAL_ERROR, t.getMessage()));
+            _linkEndpoint = new ErrantLinkEndpoint<>(this, session, new 
Error(AmqpError.INTERNAL_ERROR, t.getMessage()));
         }
         return Futures.immediateFuture(_linkEndpoint);
     }
@@ -188,31 +185,31 @@ public class LinkImpl implements Link_1_
     }
 
     @Override
-    public BaseSource getSource()
+    public S getSource()
     {
         return _source;
     }
 
     @Override
-    public void setSource(BaseSource source)
+    public void setSource(S source)
     {
         setTermini(source, _target);
     }
 
     @Override
-    public BaseTarget getTarget()
+    public T getTarget()
     {
         return _target;
     }
 
     @Override
-    public void setTarget(BaseTarget target)
+    public void setTarget(T target)
     {
         setTermini(_source, target);
     }
 
     @Override
-    public void setTermini(BaseSource source, BaseTarget target)
+    public void setTermini(S source, T target)
     {
         _source = source;
         _target = target;

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
 Fri Mar 31 13:38:45 2017
@@ -33,7 +33,7 @@ public class LinkKey
         _role = role;
     }
 
-    public LinkKey(final LinkDefinition link)
+    public LinkKey(final LinkDefinition<?, ?> link)
     {
         this(link.getRemoteContainerId(), link.getName(), link.getRole());
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
 Fri Mar 31 13:38:45 2017
@@ -20,14 +20,16 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
 public interface LinkRegistry extends LinkRegistryModel
 {
-    void linkClosed(final Link_1_0 link);
+    void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> 
link);
 
-    void linkChanged(final Link_1_0 link);
+    void linkChanged(final Link_1_0<? extends BaseSource, ? extends 
BaseTarget> link);
 
     TerminusDurability getHighestSupportedTerminusDurability();
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
 Fri Mar 31 13:38:45 2017
@@ -32,6 +32,8 @@ import org.apache.qpid.server.plugin.Qpi
 import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdaterImpl;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -41,8 +43,8 @@ import org.apache.qpid.server.util.Serve
 public class LinkRegistryImpl implements LinkRegistry
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LinkRegistryImpl.class);
-    private final ConcurrentMap<LinkKey, Link_1_0> _sendingLinkRegistry = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<LinkKey, Link_1_0> _receivingLinkRegistry = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? 
extends BaseTarget>> _sendingLinkRegistry = new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? 
extends BaseTarget>> _receivingLinkRegistry = new ConcurrentHashMap<>();
 
     private final NamedAddressSpace _addressSpace;
 
@@ -71,35 +73,36 @@ public class LinkRegistryImpl implements
     }
 
     @Override
-    public Link_1_0 getSendingLink(final String remoteContainerId, final 
String linkName)
+    public Link_1_0<? extends BaseSource, ? extends BaseTarget> 
getSendingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, 
_sendingLinkRegistry, Role.SENDER);
     }
 
     @Override
-    public Link_1_0 getReceivingLink(final String remoteContainerId, final 
String linkName)
+    public Link_1_0<? extends BaseSource, ? extends BaseTarget> 
getReceivingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, 
_receivingLinkRegistry, Role.RECEIVER);
     }
 
     @Override
-    public void linkClosed(final Link_1_0 link)
+    public void linkClosed(final Link_1_0<? extends BaseSource, ? extends 
BaseTarget>  link)
     {
-        ConcurrentMap<LinkKey, Link_1_0> linkRegistry = 
getLinkRegistry(link.getRole());
+        ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends 
BaseTarget>> linkRegistry =
+                getLinkRegistry(link.getRole());
         linkRegistry.remove(new LinkKey(link));
-        _linkStore.deleteLink(link);
+        if (isDurableLink(link))
+        {
+            _linkStore.deleteLink((Link_1_0<Source, Target>) link);
+        }
     }
 
     @Override
-    public void linkChanged(final Link_1_0 link)
+    public void linkChanged(final Link_1_0<? extends BaseSource, ? extends 
BaseTarget> link)
     {
         getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link);
-        if ((link.getRole() == Role.SENDER && link.getSource() instanceof 
Source
-             && ((Source) link.getSource()).getDurable() != 
TerminusDurability.NONE)
-            || (link.getRole() == Role.RECEIVER && link.getTarget() instanceof 
Target
-                && ((Target) link.getTarget()).getDurable() != 
TerminusDurability.NONE))
+        if (isDurableLink(link))
         {
-            _linkStore.saveLink(link);
+            _linkStore.saveLink((Link_1_0<Source, Target>) link);
         }
     }
 
@@ -113,11 +116,11 @@ public class LinkRegistryImpl implements
     @Override
     public void open()
     {
-        Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
-        for(LinkDefinition link: links)
+        Collection<LinkDefinition<Source, Target>> links = 
_linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+        for(LinkDefinition<? extends BaseSource, ? extends BaseTarget> link: 
links)
         {
-            ConcurrentMap<LinkKey, Link_1_0> linkRegistry = 
getLinkRegistry(link.getRole());
-            linkRegistry.put(new LinkKey(link), new LinkImpl(link, this));
+            ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends 
BaseTarget>> linkRegistry = getLinkRegistry(link.getRole());
+            linkRegistry.put(new LinkKey(link), new LinkImpl<>(link, this));
         }
     }
 
@@ -133,14 +136,22 @@ public class LinkRegistryImpl implements
         _linkStore.delete();
     }
 
-    private Link_1_0 getLinkFromRegistry(final String remoteContainerId,
-                                         final String linkName,
-                                         final ConcurrentMap<LinkKey, 
Link_1_0> linkRegistry,
-                                         final Role role)
+    private boolean isDurableLink(final Link_1_0<? extends BaseSource, ? 
extends BaseTarget> link)
+    {
+        return (link.getRole() == Role.SENDER && link.getSource() instanceof 
Source
+                && ((Source) link.getSource()).getDurable() != 
TerminusDurability.NONE)
+               || (link.getRole() == Role.RECEIVER && link.getTarget() 
instanceof Target
+                   && ((Target) link.getTarget()).getDurable() != 
TerminusDurability.NONE);
+    }
+
+    private Link_1_0<? extends BaseSource, ? extends BaseTarget> 
getLinkFromRegistry(final String remoteContainerId,
+                                                                               
      final String linkName,
+                                                                               
      final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends 
BaseTarget>> linkRegistry,
+                                                                               
      final Role role)
     {
         LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role);
-        Link_1_0 newLink = new LinkImpl(remoteContainerId, linkName, role, 
this);
-        Link_1_0 link = linkRegistry.putIfAbsent(linkKey, newLink);
+        Link_1_0<? extends BaseSource, ? extends BaseTarget> newLink = new 
LinkImpl(remoteContainerId, linkName, role, this);
+        Link_1_0<? extends BaseSource, ? extends BaseTarget> link = 
linkRegistry.putIfAbsent(linkKey, newLink);
         if (link == null)
         {
             link = newLink;
@@ -148,9 +159,9 @@ public class LinkRegistryImpl implements
         return link;
     }
 
-    private ConcurrentMap<LinkKey, Link_1_0> getLinkRegistry(final Role role)
+    private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends 
BaseTarget>> getLinkRegistry(final Role role)
     {
-        ConcurrentMap<LinkKey, Link_1_0> linkRegistry;
+        ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends 
BaseTarget>> linkRegistry;
         if (Role.SENDER == role)
         {
             linkRegistry = _sendingLinkRegistry;

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
 Fri Mar 31 13:38:45 2017
@@ -27,19 +27,19 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 
-public interface Link_1_0 extends LinkDefinition
+public interface Link_1_0<S extends BaseSource, T extends BaseTarget> extends 
LinkDefinition<S, T>
 {
-    ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final 
Attach attach);
+    ListenableFuture<? extends LinkEndpoint<S, T>> attach(Session_1_0 session, 
final Attach attach);
 
     void linkClosed();
 
     void discardEndpoint();
 
-    void setSource(BaseSource source);
+    void setSource(S source);
 
-    void setTarget(BaseTarget target);
+    void setTarget(T target);
 
-    void setTermini(BaseSource source, BaseTarget target);
+    void setTermini(S source, T target);
 
     TerminusDurability getHighestSupportedTerminusDurability();
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
 Fri Mar 31 13:38:45 2017
@@ -28,9 +28,12 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.store.StoreException;
 
+@SuppressWarnings("unused")
 @PluggableService
 public class NullLinkStoreFactory implements LinkStoreFactory
 {
@@ -46,8 +49,7 @@ public class NullLinkStoreFactory implem
         return new LinkStore()
         {
             @Override
-            public Collection<LinkDefinition> openAndLoad(final 
LinkStoreUpdater updater)
-                    throws StoreException, StoreException
+            public Collection<LinkDefinition<Source, Target>> 
openAndLoad(final LinkStoreUpdater updater) throws StoreException
             {
                 return Collections.emptyList();
             }
@@ -58,12 +60,12 @@ public class NullLinkStoreFactory implem
             }
 
             @Override
-            public void saveLink(final LinkDefinition link)
+            public void saveLink(final LinkDefinition<Source, Target> link)
             {
             }
 
             @Override
-            public void deleteLink(final LinkDefinition link)
+            public void deleteLink(final LinkDefinition<Source, Target> link)
             {
             }
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -75,7 +76,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
-public class SendingLinkEndpoint extends AbstractLinkEndpoint
+public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SendingLinkEndpoint.class);
 
@@ -96,7 +97,7 @@ public class SendingLinkEndpoint extends
     private ConsumerTarget_1_0 _consumerTarget;
     private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
 
-    public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl link)
+    public SendingLinkEndpoint(final Session_1_0 session, final 
LinkImpl<Source, Target> link)
     {
         super(session, link);
         setDeliveryCount(UnsignedInteger.valueOf(0));
@@ -113,7 +114,7 @@ public class SendingLinkEndpoint extends
     {
         // TODO FIXME: this method might modify the source. this is not good 
encapsulation. furthermore if it does so then it should inform the 
link/linkregistry about it!
         _destination = destination;
-        final Source source = (Source) getSource();
+        final Source source = getSource();
 
         EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
 
@@ -199,22 +200,12 @@ public class SendingLinkEndpoint extends
 
     void createConsumerTarget() throws AmqpErrorException
     {
-        final Source source = (Source) getSource();
+        final Source source = getSource();
         _consumerTarget = new ConsumerTarget_1_0(this,
                                          _destination instanceof 
ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
         try
         {
-            final String name;
-            if(getTarget() instanceof Target)
-            {
-                Target target = (Target) getTarget();
-                name = target.getAddress() == null ? getLinkName() : 
target.getAddress();
-            }
-            else
-            {
-                name = getLinkName();
-            }
-
+            final String name = getTarget().getAddress() == null ? 
getLinkName() : getTarget().getAddress();
             _consumer = _destination.getMessageSource()
                                     .addConsumer(_consumerTarget,
                                                  _consumerFilters,
@@ -299,7 +290,7 @@ public class SendingLinkEndpoint extends
         }
 
         Source newSource = (Source) attach.getSource();
-        Source oldSource = (Source) getSource();
+        Source oldSource = getSource();
 
         final SendingDestination destination = 
getSession().getSendingDestination(getLinkName(), oldSource);
         prepareConsumerOptionsAndFilters(destination);
@@ -330,7 +321,7 @@ public class SendingLinkEndpoint extends
         }
 
         Source newSource = (Source) attach.getSource();
-        Source oldSource = (Source) getSource();
+        Source oldSource = getSource();
 
         final SendingDestination destination = 
getSession().getSendingDestination(getLinkName(), oldSource);
         prepareConsumerOptionsAndFilters(destination);
@@ -368,7 +359,7 @@ public class SendingLinkEndpoint extends
             throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
         }
 
-        final SendingDestination destination = 
getSession().getSendingDestination(getLinkName(), (Source) getSource());
+        final SendingDestination destination = 
getSession().getSendingDestination(getLinkName(), getSource());
         prepareConsumerOptionsAndFilters(destination);
 
         attachReceived(attach);
@@ -387,7 +378,7 @@ public class SendingLinkEndpoint extends
 
     public TerminusDurability getTerminusDurability()
     {
-        return ((Source) getSource()).getDurable();
+        return getSource().getDurable();
     }
 
     public boolean transfer(final Transfer xfr, final boolean decrementCredit)
@@ -504,7 +495,7 @@ public class SendingLinkEndpoint extends
     {
         getConsumerTarget().close();
 
-        TerminusExpiryPolicy expiryPolicy = ((Source) 
getSource()).getExpiryPolicy();
+        TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
         if (Boolean.TRUE.equals(detach.getClosed())
             || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
             || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && 
getSession().isClosing())
@@ -633,7 +624,7 @@ public class SendingLinkEndpoint extends
         super.attachReceived(attach);
 
         Target target = (Target) attach.getTarget();
-        Source source = (Source) getSource();
+        Source source = getSource();
         if (source == null)
         {
             source = new Source();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Fri Mar 31 13:38:45 2017
@@ -74,6 +74,8 @@ import org.apache.qpid.server.model.Sess
 import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
@@ -132,9 +134,9 @@ public class Session_1_0 extends Abstrac
 
     private SessionState _sessionState;
 
-    private final Map<LinkEndpoint, UnsignedInteger> _endpointToOutputHandle = 
new HashMap<>();
-    private final Map<UnsignedInteger, LinkEndpoint> _inputHandleToEndpoint = 
new HashMap<>();
-    private final Set<LinkEndpoint> _associatedLinkEndpoints = new HashSet<>();
+    private final Map<LinkEndpoint<? extends BaseSource, ? extends 
BaseTarget>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
+    private final Map<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? 
extends BaseTarget>> _inputHandleToEndpoint = new HashMap<>();
+    private final Set<LinkEndpoint<? extends BaseSource, ? extends 
BaseTarget>> _associatedLinkEndpoints = new HashSet<>();
 
     private final short _receivingChannel;
     private final short _sendingChannel;
@@ -211,7 +213,7 @@ public class Session_1_0 extends Abstrac
             }
             else
             {
-                final Link_1_0 link;
+                final Link_1_0<? extends BaseSource, ? extends BaseTarget> 
link;
                 if (attach.getRole() == Role.RECEIVER)
                 {
                     link = 
getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), 
attach.getName());
@@ -221,7 +223,7 @@ public class Session_1_0 extends Abstrac
                     link = 
getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), 
attach.getName());
                 }
 
-                final ListenableFuture<? extends LinkEndpoint> future = 
link.attach(this, attach);
+                final ListenableFuture<? extends LinkEndpoint<?,?>> future = 
link.attach(this, attach);
 
                 addFutureCallback(future, new 
EndpointCreationCallback(attach), MoreExecutors.directExecutor());
             }
@@ -408,7 +410,7 @@ public class Session_1_0 extends Abstrac
     public void receiveFlow(final Flow flow)
     {
         UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint endpoint = handle == null ? null : 
_inputHandleToEndpoint.get(handle);
+        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
 
         final UnsignedInteger nextOutgoingId =
                 flow.getNextIncomingId() == null ? _initialOutgoingId : 
flow.getNextIncomingId();
@@ -421,8 +423,8 @@ public class Session_1_0 extends Abstrac
         }
         else
         {
-            final Collection<LinkEndpoint> allLinkEndpoints = 
_inputHandleToEndpoint.values();
-            for (LinkEndpoint le : allLinkEndpoints)
+            final Collection<LinkEndpoint<? extends BaseSource, ? extends 
BaseTarget>> allLinkEndpoints = _inputHandleToEndpoint.values();
+            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : 
allLinkEndpoints)
             {
                 le.flowStateChanged();
             }
@@ -566,7 +568,7 @@ public class Session_1_0 extends Abstrac
         _nextIncomingTransferId.incr();
 
         UnsignedInteger inputHandle = transfer.getHandle();
-        LinkEndpoint linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
+        LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint 
= _inputHandleToEndpoint.get(inputHandle);
 
         if (linkEndpoint == null)
         {
@@ -762,7 +764,7 @@ public class Session_1_0 extends Abstrac
                 ExchangeDestination newExchangeDestination = 
(ExchangeDestination) newDestination;
                 if (oldExchangeDestination.getQueue() != 
newExchangeDestination.getQueue())
                 {
-                    Source oldSource = (Source) linkEndpoint.getSource();
+                    Source oldSource = linkEndpoint.getSource();
                     oldSource.setAddress(newAddress);
                     oldSource.setFilter(newSource.getFilter());
                     return true;
@@ -1108,8 +1110,8 @@ public class Session_1_0 extends Abstrac
 
     void remoteEnd(End end)
     {
-        Set<LinkEndpoint> associatedLinkEndpoints = new 
HashSet<>(_associatedLinkEndpoints);
-        for (LinkEndpoint linkEndpoint : associatedLinkEndpoints)
+        Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> 
associatedLinkEndpoints = new HashSet<>(_associatedLinkEndpoints);
+        for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : associatedLinkEndpoints)
         {
             linkEndpoint.remoteDetached(new Detach());
             linkEndpoint.destroy();
@@ -1200,7 +1202,7 @@ public class Session_1_0 extends Abstrac
     @Override
     public void transportStateChanged()
     {
-        for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
+        for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : _endpointToOutputHandle.keySet())
         {
             if (linkEndpoint instanceof SendingLinkEndpoint)
             {
@@ -1236,7 +1238,7 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
 
-            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
                     && isQueueDestinationForLink(queue, 
((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1276,7 +1278,7 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
                         && isQueueDestinationForLink(queue, 
((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1307,7 +1309,7 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues 
**"));
 
-            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof AbstractReceivingLinkEndpoint)
                 {
@@ -1339,7 +1341,7 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof AbstractReceivingLinkEndpoint
                     && 
!_blockingEntities.contains(((AbstractReceivingLinkEndpoint) 
linkEndpoint).getReceivingDestination()))
@@ -1512,9 +1514,9 @@ public class Session_1_0 extends Abstrac
         return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
     }
 
-    public void dissociateEndpoint(LinkEndpoint linkEndpoint)
+    public void dissociateEndpoint(LinkEndpoint<? extends BaseSource, ? 
extends BaseTarget> linkEndpoint)
     {
-        for (Map.Entry<UnsignedInteger, LinkEndpoint> entry : 
_inputHandleToEndpoint.entrySet())
+        for (Map.Entry<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? 
extends BaseTarget>> entry : _inputHandleToEndpoint.entrySet())
         {
             if (entry.getValue() == linkEndpoint)
             {
@@ -1530,7 +1532,7 @@ public class Session_1_0 extends Abstrac
     {
         if(_inputHandleToEndpoint.containsKey(handle))
         {
-            LinkEndpoint endpoint = _inputHandleToEndpoint.remove(handle);
+            LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint 
= _inputHandleToEndpoint.remove(handle);
             endpoint.remoteDetached(detach);
             _endpointToOutputHandle.remove(endpoint);
         }
@@ -1602,7 +1604,7 @@ public class Session_1_0 extends Abstrac
         return primaryDomain;
     }
 
-    private class EndpointCreationCallback<T extends LinkEndpoint> implements 
FutureCallback<T>
+    private class EndpointCreationCallback<T extends LinkEndpoint<? extends 
BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
     {
 
         private final Attach _attach;
@@ -1613,7 +1615,7 @@ public class Session_1_0 extends Abstrac
         }
 
         @Override
-        public void onSuccess(final LinkEndpoint endpoint)
+        public void onSuccess(final T endpoint)
         {
             doOnIOThreadAsync(new Runnable()
             {

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -69,7 +69,7 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
+public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint<Target>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
 
@@ -79,7 +79,7 @@ public class StandardReceivingLinkEndpoi
     private Map<Binary, Outcome> _unsettledMap = 
Collections.synchronizedMap(new HashMap<Binary, Outcome>());
 
     public StandardReceivingLinkEndpoint(final Session_1_0 session,
-                                         final Link_1_0 link)
+                                         final Link_1_0<Source, Target> link)
     {
         super(session, link);
     }
@@ -93,7 +93,7 @@ public class StandardReceivingLinkEndpoi
 
     private TerminusDurability getDurability()
     {
-        return ((Target) getTarget()).getDurable();
+        return getTarget().getDurable();
     }
 
     @Override
@@ -241,7 +241,7 @@ public class StandardReceivingLinkEndpoi
                     
getReceivingDestination().authorizePublish(session.getSecurityToken(), 
routingAddress);
 
                     Outcome outcome = 
getReceivingDestination().send(serverMessage, routingAddress, transaction, 
null);
-                    Source source = (Source) getSource();
+                    Source source = getSource();
 
                     DeliveryState resultantState;
 
@@ -497,16 +497,16 @@ public class StandardReceivingLinkEndpoi
     @Override
     protected void recoverLink(final Attach attach) throws AmqpErrorException
     {
-        if (getTarget() == null || !(getTarget() instanceof Target))
+        if (getTarget() == null)
         {
             throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND,
                                                    String.format("Link '%s' 
not found", getLinkName())));
         }
 
         Source source = (Source) attach.getSource();
-        Target target = (Target) getTarget();
+        Target target = getTarget();
 
-        final ReceivingDestination destination = 
getSession().getReceivingDestination((Target) getTarget());
+        final ReceivingDestination destination = 
getSession().getReceivingDestination(getTarget());
         target.setCapabilities(destination.getCapabilities());
         setCapabilities(Arrays.asList(destination.getCapabilities()));
         setDestination(destination);

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 Fri Mar 31 13:38:45 2017
@@ -47,12 +47,12 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
+public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint<Coordinator>
 {
     private final LinkedHashMap<Integer, ServerTransaction> 
_createdTransactions = new LinkedHashMap<>();
     private ArrayList<Transfer> _incompleteMessage;
 
-    public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, 
final Link_1_0 link)
+    public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, 
final Link_1_0<Source, Coordinator> link)
     {
         super(session, link);
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
 Fri Mar 31 13:38:45 2017
@@ -23,7 +23,8 @@ import java.util.Collection;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
-import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.store.StoreException;
 
 public abstract class AbstractLinkStore implements LinkStore
@@ -31,14 +32,14 @@ public abstract class AbstractLinkStore
     private final ReentrantReadWriteLock _useOrCloseRWLock = new 
ReentrantReadWriteLock(true);
     private volatile StoreState _storeState = StoreState.CLOSED;
 
-    protected abstract Collection<LinkDefinition> doOpenAndLoad(final 
LinkStoreUpdater updater);
+    protected abstract Collection<LinkDefinition<Source, Target>> 
doOpenAndLoad(final LinkStoreUpdater updater);
     protected abstract void doClose();
     protected abstract void doDelete();
-    protected abstract void doSaveLink(final LinkDefinition link);
-    protected abstract void doDeleteLink(final LinkDefinition link);
+    protected abstract void doSaveLink(final LinkDefinition<Source, Target> 
link);
+    protected abstract void doDeleteLink(final LinkDefinition<Source, Target> 
link);
 
     @Override
-    public final Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    public final Collection<LinkDefinition<Source, Target>> openAndLoad(final 
LinkStoreUpdater updater) throws StoreException
     {
         _useOrCloseRWLock.readLock().lock();
         try
@@ -48,7 +49,7 @@ public abstract class AbstractLinkStore
                 throw new StoreException("Store is already opened");
             }
 
-            Collection<LinkDefinition> linkDefinitions = 
doOpenAndLoad(updater);
+            Collection<LinkDefinition<Source, Target>> linkDefinitions = 
doOpenAndLoad(updater);
             _storeState = StoreState.OPENED;
             return linkDefinitions;
         }
@@ -74,7 +75,7 @@ public abstract class AbstractLinkStore
     }
 
     @Override
-    public final void saveLink(final LinkDefinition link) throws StoreException
+    public final void saveLink(final LinkDefinition<Source, Target> link) 
throws StoreException
     {
         _useOrCloseRWLock.readLock().lock();
         try
@@ -93,7 +94,7 @@ public abstract class AbstractLinkStore
     }
 
     @Override
-    public final void deleteLink(final LinkDefinition link) throws 
StoreException
+    public final void deleteLink(final LinkDefinition<Source, Target> link) 
throws StoreException
     {
         _useOrCloseRWLock.readLock().lock();
         try

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
 Fri Mar 31 13:38:45 2017
@@ -22,19 +22,21 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.store.StoreException;
 
 
 public interface LinkStore
 {
-    Collection<LinkDefinition> openAndLoad(LinkStoreUpdater updater) throws 
StoreException;
+    Collection<LinkDefinition<Source, Target>> openAndLoad(LinkStoreUpdater 
updater) throws StoreException;
 
     void close() throws StoreException;
 
-    void saveLink(LinkDefinition link) throws StoreException;
+    void saveLink(LinkDefinition<Source, Target> link) throws StoreException;
 
-    void deleteLink(LinkDefinition link) throws StoreException;
+    void deleteLink(LinkDefinition<Source, Target> link) throws StoreException;
 
     void delete();
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
 Fri Mar 31 13:38:45 2017
@@ -23,8 +23,10 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 
 public interface LinkStoreUpdater
 {
-    Collection<LinkDefinition> update(String currentVersion, 
Collection<LinkDefinition> linkDefinitions);
+    Collection<LinkDefinition<Source, Target>> update(String currentVersion, 
Collection<LinkDefinition<Source, Target>> linkDefinitions);
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
 Fri Mar 31 13:38:45 2017
@@ -23,12 +23,14 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 
 public class LinkStoreUpdaterImpl implements LinkStoreUpdater
 {
     @Override
-    public Collection<LinkDefinition> update(final String currentVersion,
-                                             final Collection<LinkDefinition> 
linkDefinitions)
+    public Collection<LinkDefinition<Source, Target>> update(final String 
currentVersion,
+                                                             final 
Collection<LinkDefinition<Source, Target>> linkDefinitions)
     {
         return linkDefinitions;
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java?rev=1789687&r1=1789686&r2=1789687&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
 Fri Mar 31 13:38:45 2017
@@ -28,6 +28,8 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
@@ -48,7 +50,7 @@ public class TestLinkStoreFactory implem
         return new LinkStore()
         {
             @Override
-            public Collection<LinkDefinition> openAndLoad(final 
LinkStoreUpdater updater) throws StoreException
+            public Collection<LinkDefinition<Source, Target>> 
openAndLoad(final LinkStoreUpdater updater) throws StoreException
             {
                 return Collections.emptyList();
             }
@@ -60,13 +62,13 @@ public class TestLinkStoreFactory implem
             }
 
             @Override
-            public void saveLink(final LinkDefinition link) throws 
StoreException
+            public void saveLink(final LinkDefinition<Source, Target> link) 
throws StoreException
             {
 
             }
 
             @Override
-            public void deleteLink(final LinkDefinition link) throws 
StoreException
+            public void deleteLink(final LinkDefinition<Source, Target> link) 
throws StoreException
             {
 
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to