Author: lquack Date: Fri Mar 31 13:38:45 2017 New Revision: 1789687 URL: http://svn.apache.org/viewvc?rev=1789687&view=rev Log: QPID-7658: [Java Broker] LinkRegistry: Address review comment: Make Link and LinkEndpoint generic
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java Modified: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java Fri Mar 31 13:38:45 2017 @@ -45,6 +45,8 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.LinkKey; import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer; @@ -64,7 +66,7 @@ public class BDBLinkStore extends Abstra } @Override - protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater updater) throws StoreException + protected Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final LinkStoreUpdater updater) throws StoreException { try { @@ -77,7 +79,7 @@ public class BDBLinkStore extends Abstra } @Override - protected void doSaveLink(final LinkDefinition link) + protected void doSaveLink(final LinkDefinition<Source, Target> link) { try { @@ -91,7 +93,7 @@ public class BDBLinkStore extends Abstra } @Override - protected void doDeleteLink(final LinkDefinition linkDefinition) + protected void doDeleteLink(final LinkDefinition<Source, Target> linkDefinition) { LinkKey linkKey = new LinkKey(linkDefinition); try @@ -145,10 +147,10 @@ public class BDBLinkStore extends Abstra } - private Collection<LinkDefinition> getLinkDefinitions(final LinkStoreUpdater updater) + private Collection<LinkDefinition<Source, Target>> getLinkDefinitions(final LinkStoreUpdater updater) { Database linksDatabase = getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG); - Collection<LinkDefinition> links = new HashSet<>(); + Collection<LinkDefinition<Source, Target>> links = new HashSet<>(); ModelVersion currentVersion = new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, BrokerModel.MODEL_MINOR_VERSION); @@ -168,7 +170,7 @@ public class BDBLinkStore extends Abstra { LinkKey linkKey = keyEntryBinding.entryToObject(key); LinkValue linkValue = linkValueEntryBinding.entryToObject(value); - LinkDefinition link = new LinkDefinitionImpl(linkKey.getRemoteContainerId(), linkKey.getLinkName(), linkKey.getRole(), linkValue.getSource(), linkValue.getTarget()); + LinkDefinition<Source, Target> link = new LinkDefinitionImpl<>(linkKey.getRemoteContainerId(), linkKey.getLinkName(), linkKey.getRole(), linkValue.getSource(), linkValue.getTarget()); links.add(link); } } @@ -180,7 +182,7 @@ public class BDBLinkStore extends Abstra try { linksDatabase = getEnvironmentFacade().clearDatabase(txn, LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG); - for (LinkDefinition link : links) + for (LinkDefinition<Source, Target> link : links) { save(linksDatabase, txn, link); } @@ -208,7 +210,7 @@ public class BDBLinkStore extends Abstra linksVersionDb.put(txn, key, value); } - private void save(Database database, Transaction txn, final LinkDefinition link) + private void save(Database database, Transaction txn, final LinkDefinition<Source, Target> link) { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java Fri Mar 31 13:38:45 2017 @@ -20,34 +20,34 @@ package org.apache.qpid.server.protocol.v1_0.store.bdb; import org.apache.qpid.server.protocol.v1_0.LinkDefinition; -import org.apache.qpid.server.protocol.v1_0.type.BaseSource; -import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; public class LinkValue { static final byte CURRENT_VERSION = 0; - private final BaseSource _source; - private final BaseTarget _target; + private final Source _source; + private final Target _target; private final byte _version; - public LinkValue(final BaseSource source, final BaseTarget target, final byte version) + public LinkValue(final Source source, final Target target, final byte version) { _source = source; _target = target; _version = version; } - public LinkValue(final LinkDefinition link) + public LinkValue(final LinkDefinition<Source, Target> link) { this(link.getSource(), link.getTarget(), CURRENT_VERSION); } - public BaseSource getSource() + public Source getSource() { return _source; } - public BaseTarget getTarget() + public Target getTarget() { return _target; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java Fri Mar 31 13:38:45 2017 @@ -52,6 +52,8 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; @@ -81,9 +83,9 @@ public class JDBCLinkStore extends Abstr } @Override - protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater updater) throws StoreException + protected Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final LinkStoreUpdater updater) throws StoreException { - Collection<LinkDefinition> linkDefinitions; + Collection<LinkDefinition<Source, Target>> linkDefinitions; try { checkTransactionIsolationLevel(); @@ -117,7 +119,7 @@ public class JDBCLinkStore extends Abstr } @Override - protected void doSaveLink(final LinkDefinition link) throws StoreException + protected void doSaveLink(final LinkDefinition<Source, Target> link) throws StoreException { String linkKey = generateLinkKey(link); Connection connection = getConnection(); @@ -164,7 +166,7 @@ public class JDBCLinkStore extends Abstr } @Override - protected void doDeleteLink(final LinkDefinition link) throws StoreException + protected void doDeleteLink(final LinkDefinition<Source, Target> link) throws StoreException { try (Connection connection = getConnection(); @@ -276,10 +278,11 @@ public class JDBCLinkStore extends Abstr return _tableNamePrefix + VERSION_TABLE_NAME_SUFFIX; } - private Collection<LinkDefinition> performUpdate(final LinkStoreUpdater updater, - Collection<LinkDefinition> linkDefinitions, - final ModelVersion storedVersion, - final ModelVersion currentVersion) throws SQLException + private Collection<LinkDefinition<Source, Target>> performUpdate(final LinkStoreUpdater updater, + Collection<LinkDefinition<Source, Target>> linkDefinitions, + final ModelVersion storedVersion, + final ModelVersion currentVersion) + throws SQLException { linkDefinitions = updater.update(storedVersion.toString(), linkDefinitions); Connection connection = getConnection(); @@ -292,7 +295,7 @@ public class JDBCLinkStore extends Abstr statement.execute("DELETE FROM " + getLinksTableName()); } - for (LinkDefinition linkDefinition : linkDefinitions) + for (LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition : linkDefinitions) { insert(connection, generateLinkKey(linkDefinition), linkDefinition); } @@ -318,9 +321,9 @@ public class JDBCLinkStore extends Abstr return linkDefinitions; } - private Collection<LinkDefinition> getLinks() throws SQLException + private Collection<LinkDefinition<Source, Target>> getLinks() throws SQLException { - Collection<LinkDefinition> links = new ArrayList<>(); + Collection<LinkDefinition<Source, Target>> links = new ArrayList<>(); try (Connection connection = getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(String.format( @@ -335,7 +338,7 @@ public class JDBCLinkStore extends Abstr Source source = (Source) getBlobAsAmqpObject(resultSet, 4); Target target = (Target) getBlobAsAmqpObject(resultSet, 5); - links.add(new LinkDefinitionImpl(remoteContainerId, linkName, role, source, target)); + links.add(new LinkDefinitionImpl<>(remoteContainerId, linkName, role, source, target)); } } return links; @@ -431,7 +434,9 @@ public class JDBCLinkStore extends Abstr } - private void insert(final Connection connection, final String linkKey, final LinkDefinition linkDefinition) + private void insert(final Connection connection, + final String linkKey, + final LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition) throws SQLException { try (PreparedStatement statement = connection.prepareStatement(String.format( @@ -451,7 +456,9 @@ public class JDBCLinkStore extends Abstr } } - private void update(final Connection connection, final String linkKey, final LinkDefinition linkDefinition) + private void update(final Connection connection, + final String linkKey, + final LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition) throws SQLException { try (PreparedStatement statement = connection.prepareStatement(String.format( @@ -500,7 +507,7 @@ public class JDBCLinkStore extends Abstr saveBytesAsBlob(statement, index, value.getBytes(UTF_8)); } - private String generateLinkKey(final LinkDefinition linkDefinition) + private String generateLinkKey(final LinkDefinition<?, ?> linkDefinition) { MessageDigest md; try Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -47,10 +47,10 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; -public abstract class AbstractLinkEndpoint implements LinkEndpoint +public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T> { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class); - private final Link_1_0 _link; + private final Link_1_0<S, T> _link; private final Session_1_0 _session; private Object _flowTransactionId; private SenderSettleMode _sendingSettlementMode; @@ -82,7 +82,7 @@ public abstract class AbstractLinkEndpoi } - AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0 link) + AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0<S, T> link) { _session = session; _link = link; @@ -158,13 +158,13 @@ public abstract class AbstractLinkEndpoi } @Override - public BaseSource getSource() + public S getSource() { return _link.getSource(); } @Override - public BaseTarget getTarget() + public T getTarget() { return _link.getTarget(); } @@ -468,7 +468,7 @@ public abstract class AbstractLinkEndpoi } } - public Link_1_0 getLink() + protected Link_1_0<S, T> getLink() { return _link; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -27,11 +27,13 @@ import java.util.Map; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; @@ -39,7 +41,7 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; -public abstract class AbstractReceivingLinkEndpoint extends AbstractLinkEndpoint +public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extends AbstractLinkEndpoint<Source, T> { private final SectionDecoder _sectionDecoder; private UnsignedInteger _lastDeliveryId; @@ -88,7 +90,7 @@ public abstract class AbstractReceivingL } - public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link) + public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, T> link) { super(session, link); _sectionDecoder = new SectionDecoderImpl(session.getConnection() Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Mar 31 13:38:45 2017 @@ -56,6 +56,7 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified; import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; import org.apache.qpid.server.protocol.v1_0.type.messaging.Released; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; @@ -605,9 +606,7 @@ class ConsumerTarget_1_0 extends Abstrac @Override public String getTargetAddress() { - BaseTarget target = _linkEndpoint.getTarget(); - - return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getLinkName(); + return _linkEndpoint.getTarget().getAddress(); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Fri Mar 31 13:38:45 2017 @@ -19,6 +19,8 @@ package org.apache.qpid.server.protocol.v1_0; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; @@ -27,12 +29,12 @@ public class Delivery { private final UnsignedInteger _deliveryId; private final Binary _deliveryTag; - private final LinkEndpoint _linkEndpoint; + private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> _linkEndpoint; private boolean _complete; private boolean _settled; private int _numberOfTransfers = 0; - public Delivery(Transfer transfer, final LinkEndpoint endpoint) + public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint) { _settled = Boolean.TRUE.equals(transfer.getSettled()); _deliveryId = transfer.getDeliveryId(); @@ -79,7 +81,7 @@ public class Delivery return _deliveryId; } - public LinkEndpoint getLinkEndpoint() + public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> getLinkEndpoint() { return _linkEndpoint; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -30,14 +30,14 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -public class ErrantLinkEndpoint implements LinkEndpoint +public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T> { - private final Link_1_0 _link; + private final Link_1_0<S, T> _link; private final Session_1_0 _session; private Error _error; private UnsignedInteger _localHandle; - ErrantLinkEndpoint(Link_1_0 link, Session_1_0 session, Error error) + ErrantLinkEndpoint(Link_1_0<S, T> link, Session_1_0 session, Error error) { _link = link; _session = session; @@ -51,13 +51,13 @@ public class ErrantLinkEndpoint implemen } @Override - public BaseSource getSource() + public S getSource() { return null; } @Override - public BaseTarget getTarget() + public T getTarget() { return null; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java Fri Mar 31 13:38:45 2017 @@ -25,7 +25,7 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -public interface LinkDefinition extends LinkModel +public interface LinkDefinition<S extends BaseSource, T extends BaseTarget> extends LinkModel { String getRemoteContainerId(); @@ -33,7 +33,7 @@ public interface LinkDefinition extends Role getRole(); - BaseSource getSource(); + S getSource(); - BaseTarget getTarget(); + T getTarget(); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java Fri Mar 31 13:38:45 2017 @@ -24,19 +24,19 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -public class LinkDefinitionImpl implements LinkDefinition +public class LinkDefinitionImpl<S extends BaseSource, T extends BaseTarget> implements LinkDefinition<S, T> { private final String _remoteContainerId; private final String _name; private final Role _role; - private final BaseSource _source; - private final BaseTarget _target; + private final S _source; + private final T _target; public LinkDefinitionImpl(final String remoteContainerId, final String name, final Role role, - final BaseSource source, - final BaseTarget target) + final S source, + final T target) { _remoteContainerId = remoteContainerId; _name = name; @@ -64,13 +64,13 @@ public class LinkDefinitionImpl implemen } @Override - public BaseSource getSource() + public S getSource() { return _source; } @Override - public BaseTarget getTarget() + public T getTarget() { return _target; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -30,13 +30,13 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -public interface LinkEndpoint +public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget> { Role getRole(); - BaseSource getSource(); + S getSource(); - BaseTarget getTarget(); + T getTarget(); Session_1_0 getSession(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Fri Mar 31 13:38:45 2017 @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.BaseSource; import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; @@ -38,9 +39,8 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -public class LinkImpl implements Link_1_0 +public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Link_1_0<S, T> { private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class); @@ -49,9 +49,9 @@ public class LinkImpl implements Link_1_ private final Role _role; private final LinkRegistry _linkRegistry; - private volatile LinkEndpoint _linkEndpoint; - private volatile BaseSource _source; - private volatile BaseTarget _target; + private volatile LinkEndpoint<S, T> _linkEndpoint; + private volatile S _source; + private volatile T _target; public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry) { @@ -61,14 +61,14 @@ public class LinkImpl implements Link_1_ _linkRegistry = linkRegistry; } - public LinkImpl(final LinkDefinition linkDefinition, final LinkRegistry linkRegistry) + public LinkImpl(final LinkDefinition<S, T> linkDefinition, final LinkRegistry linkRegistry) { this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry); setTermini(linkDefinition.getSource(), linkDefinition.getTarget()); } @Override - public final ListenableFuture<? extends LinkEndpoint> attach(final Session_1_0 session, final Attach attach) + public final ListenableFuture<? extends LinkEndpoint<S, T>> attach(final Session_1_0 session, final Attach attach) { try { @@ -91,7 +91,7 @@ public class LinkImpl implements Link_1_ _linkEndpoint.receiveAttach(attach); _linkRegistry.linkChanged(this); - return Futures.immediateFuture((LinkEndpoint) _linkEndpoint); + return Futures.immediateFuture(_linkEndpoint); } } catch (Exception e) @@ -101,62 +101,59 @@ public class LinkImpl implements Link_1_ } } - private synchronized ListenableFuture<LinkEndpoint> stealLink(final Session_1_0 session, final Attach attach) + private synchronized ListenableFuture<LinkEndpoint<S, T>> stealLink(final Session_1_0 session, final Attach attach) { - final SettableFuture<LinkEndpoint> returnFuture = SettableFuture.create(); - _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable() - { - @Override - public void run() - { - _linkEndpoint.close(new Error(LinkError.STOLEN, - String.format("Link is being stolen by connection '%s'", - session.getConnection()))); - try - { - returnFuture.set(attach(session, attach).get()); - } - catch (InterruptedException e) + final SettableFuture<LinkEndpoint<S, T>> returnFuture = SettableFuture.create(); + _linkEndpoint.getSession().doOnIOThreadAsync( + () -> { - returnFuture.setException(e); - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) - { - returnFuture.setException(e.getCause()); - } - } - }); + _linkEndpoint.close(new Error(LinkError.STOLEN, + String.format("Link is being stolen by connection '%s'", + session.getConnection()))); + try + { + returnFuture.set(attach(session, attach).get()); + } + catch (InterruptedException e) + { + returnFuture.setException(e); + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + returnFuture.setException(e.getCause()); + } + }); return returnFuture; } - private LinkEndpoint createLinkEndpoint(final Session_1_0 session, final Attach attach) + private LinkEndpoint<S, T> createLinkEndpoint(final Session_1_0 session, final Attach attach) { - LinkEndpoint linkEndpoint = null; + final LinkEndpoint<S, T> linkEndpoint; if (_role == Role.SENDER) { - linkEndpoint = new SendingLinkEndpoint(session, this); + linkEndpoint = (LinkEndpoint<S, T>) new SendingLinkEndpoint(session, (LinkImpl<Source, Target>) this); } else if (attach.getTarget() instanceof Coordinator) { - linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(session, this); + linkEndpoint = (LinkEndpoint<S, T>) new TxnCoordinatorReceivingLinkEndpoint(session, (LinkImpl<Source, Coordinator>) this); } else { - linkEndpoint = new StandardReceivingLinkEndpoint(session, this); + linkEndpoint = (LinkEndpoint<S, T>) new StandardReceivingLinkEndpoint(session, (LinkImpl<Source, Target>) this); } return linkEndpoint; } - private ListenableFuture<? extends LinkEndpoint> rejectLink(final Session_1_0 session, Throwable t) + private ListenableFuture<? extends LinkEndpoint<S, T>> rejectLink(final Session_1_0 session, Throwable t) { if (t instanceof AmqpErrorException) { - _linkEndpoint = new ErrantLinkEndpoint(this, session, ((AmqpErrorException) t).getError()); + _linkEndpoint = new ErrantLinkEndpoint<>(this, session, ((AmqpErrorException) t).getError()); } else { - _linkEndpoint = new ErrantLinkEndpoint(this, session, new Error(AmqpError.INTERNAL_ERROR, t.getMessage())); + _linkEndpoint = new ErrantLinkEndpoint<>(this, session, new Error(AmqpError.INTERNAL_ERROR, t.getMessage())); } return Futures.immediateFuture(_linkEndpoint); } @@ -188,31 +185,31 @@ public class LinkImpl implements Link_1_ } @Override - public BaseSource getSource() + public S getSource() { return _source; } @Override - public void setSource(BaseSource source) + public void setSource(S source) { setTermini(source, _target); } @Override - public BaseTarget getTarget() + public T getTarget() { return _target; } @Override - public void setTarget(BaseTarget target) + public void setTarget(T target) { setTermini(_source, target); } @Override - public void setTermini(BaseSource source, BaseTarget target) + public void setTermini(S source, T target) { _source = source; _target = target; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java Fri Mar 31 13:38:45 2017 @@ -33,7 +33,7 @@ public class LinkKey _role = role; } - public LinkKey(final LinkDefinition link) + public LinkKey(final LinkDefinition<?, ?> link) { this(link.getRemoteContainerId(), link.getName(), link.getRole()); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java Fri Mar 31 13:38:45 2017 @@ -20,14 +20,16 @@ package org.apache.qpid.server.protocol.v1_0; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.virtualhost.LinkRegistryModel; public interface LinkRegistry extends LinkRegistryModel { - void linkClosed(final Link_1_0 link); + void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link); - void linkChanged(final Link_1_0 link); + void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link); TerminusDurability getHighestSupportedTerminusDurability(); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Fri Mar 31 13:38:45 2017 @@ -32,6 +32,8 @@ import org.apache.qpid.server.plugin.Qpi import org.apache.qpid.server.protocol.v1_0.store.LinkStore; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdaterImpl; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; @@ -41,8 +43,8 @@ import org.apache.qpid.server.util.Serve public class LinkRegistryImpl implements LinkRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(LinkRegistryImpl.class); - private final ConcurrentMap<LinkKey, Link_1_0> _sendingLinkRegistry = new ConcurrentHashMap<>(); - private final ConcurrentMap<LinkKey, Link_1_0> _receivingLinkRegistry = new ConcurrentHashMap<>(); + private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _sendingLinkRegistry = new ConcurrentHashMap<>(); + private final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> _receivingLinkRegistry = new ConcurrentHashMap<>(); private final NamedAddressSpace _addressSpace; @@ -71,35 +73,36 @@ public class LinkRegistryImpl implements } @Override - public Link_1_0 getSendingLink(final String remoteContainerId, final String linkName) + public Link_1_0<? extends BaseSource, ? extends BaseTarget> getSendingLink(final String remoteContainerId, final String linkName) { return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER); } @Override - public Link_1_0 getReceivingLink(final String remoteContainerId, final String linkName) + public Link_1_0<? extends BaseSource, ? extends BaseTarget> getReceivingLink(final String remoteContainerId, final String linkName) { return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER); } @Override - public void linkClosed(final Link_1_0 link) + public void linkClosed(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link) { - ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole()); + ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry = + getLinkRegistry(link.getRole()); linkRegistry.remove(new LinkKey(link)); - _linkStore.deleteLink(link); + if (isDurableLink(link)) + { + _linkStore.deleteLink((Link_1_0<Source, Target>) link); + } } @Override - public void linkChanged(final Link_1_0 link) + public void linkChanged(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link) { getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link); - if ((link.getRole() == Role.SENDER && link.getSource() instanceof Source - && ((Source) link.getSource()).getDurable() != TerminusDurability.NONE) - || (link.getRole() == Role.RECEIVER && link.getTarget() instanceof Target - && ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE)) + if (isDurableLink(link)) { - _linkStore.saveLink(link); + _linkStore.saveLink((Link_1_0<Source, Target>) link); } } @@ -113,11 +116,11 @@ public class LinkRegistryImpl implements @Override public void open() { - Collection<LinkDefinition> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl()); - for(LinkDefinition link: links) + Collection<LinkDefinition<Source, Target>> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl()); + for(LinkDefinition<? extends BaseSource, ? extends BaseTarget> link: links) { - ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole()); - linkRegistry.put(new LinkKey(link), new LinkImpl(link, this)); + ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry = getLinkRegistry(link.getRole()); + linkRegistry.put(new LinkKey(link), new LinkImpl<>(link, this)); } } @@ -133,14 +136,22 @@ public class LinkRegistryImpl implements _linkStore.delete(); } - private Link_1_0 getLinkFromRegistry(final String remoteContainerId, - final String linkName, - final ConcurrentMap<LinkKey, Link_1_0> linkRegistry, - final Role role) + private boolean isDurableLink(final Link_1_0<? extends BaseSource, ? extends BaseTarget> link) + { + return (link.getRole() == Role.SENDER && link.getSource() instanceof Source + && ((Source) link.getSource()).getDurable() != TerminusDurability.NONE) + || (link.getRole() == Role.RECEIVER && link.getTarget() instanceof Target + && ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE); + } + + private Link_1_0<? extends BaseSource, ? extends BaseTarget> getLinkFromRegistry(final String remoteContainerId, + final String linkName, + final ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry, + final Role role) { LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role); - Link_1_0 newLink = new LinkImpl(remoteContainerId, linkName, role, this); - Link_1_0 link = linkRegistry.putIfAbsent(linkKey, newLink); + Link_1_0<? extends BaseSource, ? extends BaseTarget> newLink = new LinkImpl(remoteContainerId, linkName, role, this); + Link_1_0<? extends BaseSource, ? extends BaseTarget> link = linkRegistry.putIfAbsent(linkKey, newLink); if (link == null) { link = newLink; @@ -148,9 +159,9 @@ public class LinkRegistryImpl implements return link; } - private ConcurrentMap<LinkKey, Link_1_0> getLinkRegistry(final Role role) + private ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> getLinkRegistry(final Role role) { - ConcurrentMap<LinkKey, Link_1_0> linkRegistry; + ConcurrentMap<LinkKey, Link_1_0<? extends BaseSource, ? extends BaseTarget>> linkRegistry; if (Role.SENDER == role) { linkRegistry = _sendingLinkRegistry; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Fri Mar 31 13:38:45 2017 @@ -27,19 +27,19 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; -public interface Link_1_0 extends LinkDefinition +public interface Link_1_0<S extends BaseSource, T extends BaseTarget> extends LinkDefinition<S, T> { - ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach); + ListenableFuture<? extends LinkEndpoint<S, T>> attach(Session_1_0 session, final Attach attach); void linkClosed(); void discardEndpoint(); - void setSource(BaseSource source); + void setSource(S source); - void setTarget(BaseTarget target); + void setTarget(T target); - void setTermini(BaseSource source, BaseTarget target); + void setTermini(S source, T target); TerminusDurability getHighestSupportedTerminusDurability(); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java Fri Mar 31 13:38:45 2017 @@ -28,9 +28,12 @@ import org.apache.qpid.server.plugin.Plu import org.apache.qpid.server.protocol.v1_0.store.LinkStore; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.store.StoreException; +@SuppressWarnings("unused") @PluggableService public class NullLinkStoreFactory implements LinkStoreFactory { @@ -46,8 +49,7 @@ public class NullLinkStoreFactory implem return new LinkStore() { @Override - public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater updater) - throws StoreException, StoreException + public Collection<LinkDefinition<Source, Target>> openAndLoad(final LinkStoreUpdater updater) throws StoreException { return Collections.emptyList(); } @@ -58,12 +60,12 @@ public class NullLinkStoreFactory implem } @Override - public void saveLink(final LinkDefinition link) + public void saveLink(final LinkDefinition<Source, Target> link) { } @Override - public void deleteLink(final LinkDefinition link) + public void deleteLink(final LinkDefinition<Source, Target> link) { } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Name import org.apache.qpid.server.model.NotFoundException; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.Outcome; @@ -75,7 +76,7 @@ import org.apache.qpid.server.txn.Server import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; -public class SendingLinkEndpoint extends AbstractLinkEndpoint +public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> { private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class); @@ -96,7 +97,7 @@ public class SendingLinkEndpoint extends private ConsumerTarget_1_0 _consumerTarget; private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer; - public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl link) + public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link) { super(session, link); setDeliveryCount(UnsignedInteger.valueOf(0)); @@ -113,7 +114,7 @@ public class SendingLinkEndpoint extends { // TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it! _destination = destination; - final Source source = (Source) getSource(); + final Source source = getSource(); EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class); @@ -199,22 +200,12 @@ public class SendingLinkEndpoint extends void createConsumerTarget() throws AmqpErrorException { - final Source source = (Source) getSource(); + final Source source = getSource(); _consumerTarget = new ConsumerTarget_1_0(this, _destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY); try { - final String name; - if(getTarget() instanceof Target) - { - Target target = (Target) getTarget(); - name = target.getAddress() == null ? getLinkName() : target.getAddress(); - } - else - { - name = getLinkName(); - } - + final String name = getTarget().getAddress() == null ? getLinkName() : getTarget().getAddress(); _consumer = _destination.getMessageSource() .addConsumer(_consumerTarget, _consumerFilters, @@ -299,7 +290,7 @@ public class SendingLinkEndpoint extends } Source newSource = (Source) attach.getSource(); - Source oldSource = (Source) getSource(); + Source oldSource = getSource(); final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource); prepareConsumerOptionsAndFilters(destination); @@ -330,7 +321,7 @@ public class SendingLinkEndpoint extends } Source newSource = (Source) attach.getSource(); - Source oldSource = (Source) getSource(); + Source oldSource = getSource(); final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource); prepareConsumerOptionsAndFilters(destination); @@ -368,7 +359,7 @@ public class SendingLinkEndpoint extends throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, "")); } - final SendingDestination destination = getSession().getSendingDestination(getLinkName(), (Source) getSource()); + final SendingDestination destination = getSession().getSendingDestination(getLinkName(), getSource()); prepareConsumerOptionsAndFilters(destination); attachReceived(attach); @@ -387,7 +378,7 @@ public class SendingLinkEndpoint extends public TerminusDurability getTerminusDurability() { - return ((Source) getSource()).getDurable(); + return getSource().getDurable(); } public boolean transfer(final Transfer xfr, final boolean decrementCredit) @@ -504,7 +495,7 @@ public class SendingLinkEndpoint extends { getConsumerTarget().close(); - TerminusExpiryPolicy expiryPolicy = ((Source) getSource()).getExpiryPolicy(); + TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy(); if (Boolean.TRUE.equals(detach.getClosed()) || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy) || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing()) @@ -633,7 +624,7 @@ public class SendingLinkEndpoint extends super.attachReceived(attach); Target target = (Target) attach.getTarget(); - Source source = (Source) getSource(); + Source source = getSource(); if (source == null) { source = new Source(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Mar 31 13:38:45 2017 @@ -74,6 +74,8 @@ import org.apache.qpid.server.model.Sess import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition; @@ -132,9 +134,9 @@ public class Session_1_0 extends Abstrac private SessionState _sessionState; - private final Map<LinkEndpoint, UnsignedInteger> _endpointToOutputHandle = new HashMap<>(); - private final Map<UnsignedInteger, LinkEndpoint> _inputHandleToEndpoint = new HashMap<>(); - private final Set<LinkEndpoint> _associatedLinkEndpoints = new HashSet<>(); + private final Map<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>(); + private final Map<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _inputHandleToEndpoint = new HashMap<>(); + private final Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _associatedLinkEndpoints = new HashSet<>(); private final short _receivingChannel; private final short _sendingChannel; @@ -211,7 +213,7 @@ public class Session_1_0 extends Abstrac } else { - final Link_1_0 link; + final Link_1_0<? extends BaseSource, ? extends BaseTarget> link; if (attach.getRole() == Role.RECEIVER) { link = getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), attach.getName()); @@ -221,7 +223,7 @@ public class Session_1_0 extends Abstrac link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName()); } - final ListenableFuture<? extends LinkEndpoint> future = link.attach(this, attach); + final ListenableFuture<? extends LinkEndpoint<?,?>> future = link.attach(this, attach); addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor()); } @@ -408,7 +410,7 @@ public class Session_1_0 extends Abstrac public void receiveFlow(final Flow flow) { UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle); + final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle); final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); @@ -421,8 +423,8 @@ public class Session_1_0 extends Abstrac } else { - final Collection<LinkEndpoint> allLinkEndpoints = _inputHandleToEndpoint.values(); - for (LinkEndpoint le : allLinkEndpoints) + final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints = _inputHandleToEndpoint.values(); + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints) { le.flowStateChanged(); } @@ -566,7 +568,7 @@ public class Session_1_0 extends Abstrac _nextIncomingTransferId.incr(); UnsignedInteger inputHandle = transfer.getHandle(); - LinkEndpoint linkEndpoint = _inputHandleToEndpoint.get(inputHandle); + LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle); if (linkEndpoint == null) { @@ -762,7 +764,7 @@ public class Session_1_0 extends Abstrac ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination; if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue()) { - Source oldSource = (Source) linkEndpoint.getSource(); + Source oldSource = linkEndpoint.getSource(); oldSource.setAddress(newAddress); oldSource.setFilter(newSource.getFilter()); return true; @@ -1108,8 +1110,8 @@ public class Session_1_0 extends Abstrac void remoteEnd(End end) { - Set<LinkEndpoint> associatedLinkEndpoints = new HashSet<>(_associatedLinkEndpoints); - for (LinkEndpoint linkEndpoint : associatedLinkEndpoints) + Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> associatedLinkEndpoints = new HashSet<>(_associatedLinkEndpoints); + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : associatedLinkEndpoints) { linkEndpoint.remoteDetached(new Detach()); linkEndpoint.destroy(); @@ -1200,7 +1202,7 @@ public class Session_1_0 extends Abstrac @Override public void transportStateChanged() { - for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet()) + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { if (linkEndpoint instanceof SendingLinkEndpoint) { @@ -1236,7 +1238,7 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName())); - for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet()) + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { if (linkEndpoint instanceof AbstractReceivingLinkEndpoint && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) @@ -1276,7 +1278,7 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_REMOVED()); } - for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet()) + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { if (linkEndpoint instanceof AbstractReceivingLinkEndpoint && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) @@ -1307,7 +1309,7 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **")); - for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet()) + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { if (linkEndpoint instanceof AbstractReceivingLinkEndpoint) { @@ -1339,7 +1341,7 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_REMOVED()); } - for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet()) + for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { if (linkEndpoint instanceof AbstractReceivingLinkEndpoint && !_blockingEntities.contains(((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) @@ -1512,9 +1514,9 @@ public class Session_1_0 extends Abstrac return "Session_1_0[" + _connection + ": " + _sendingChannel + ']'; } - public void dissociateEndpoint(LinkEndpoint linkEndpoint) + public void dissociateEndpoint(LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint) { - for (Map.Entry<UnsignedInteger, LinkEndpoint> entry : _inputHandleToEndpoint.entrySet()) + for (Map.Entry<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> entry : _inputHandleToEndpoint.entrySet()) { if (entry.getValue() == linkEndpoint) { @@ -1530,7 +1532,7 @@ public class Session_1_0 extends Abstrac { if(_inputHandleToEndpoint.containsKey(handle)) { - LinkEndpoint endpoint = _inputHandleToEndpoint.remove(handle); + LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = _inputHandleToEndpoint.remove(handle); endpoint.remoteDetached(detach); _endpointToOutputHandle.remove(endpoint); } @@ -1602,7 +1604,7 @@ public class Session_1_0 extends Abstrac return primaryDomain; } - private class EndpointCreationCallback<T extends LinkEndpoint> implements FutureCallback<T> + private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T> { private final Attach _attach; @@ -1613,7 +1615,7 @@ public class Session_1_0 extends Abstrac } @Override - public void onSuccess(final LinkEndpoint endpoint) + public void onSuccess(final T endpoint) { doOnIOThreadAsync(new Runnable() { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -69,7 +69,7 @@ import org.apache.qpid.server.txn.LocalT import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint +public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target> { private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class); @@ -79,7 +79,7 @@ public class StandardReceivingLinkEndpoi private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>()); public StandardReceivingLinkEndpoint(final Session_1_0 session, - final Link_1_0 link) + final Link_1_0<Source, Target> link) { super(session, link); } @@ -93,7 +93,7 @@ public class StandardReceivingLinkEndpoi private TerminusDurability getDurability() { - return ((Target) getTarget()).getDurable(); + return getTarget().getDurable(); } @Override @@ -241,7 +241,7 @@ public class StandardReceivingLinkEndpoi getReceivingDestination().authorizePublish(session.getSecurityToken(), routingAddress); Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction, null); - Source source = (Source) getSource(); + Source source = getSource(); DeliveryState resultantState; @@ -497,16 +497,16 @@ public class StandardReceivingLinkEndpoi @Override protected void recoverLink(final Attach attach) throws AmqpErrorException { - if (getTarget() == null || !(getTarget() instanceof Target)) + if (getTarget() == null) { throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, String.format("Link '%s' not found", getLinkName()))); } Source source = (Source) attach.getSource(); - Target target = (Target) getTarget(); + Target target = getTarget(); - final ReceivingDestination destination = getSession().getReceivingDestination((Target) getTarget()); + final ReceivingDestination destination = getSession().getReceivingDestination(getTarget()); target.setCapabilities(destination.getCapabilities()); setCapabilities(Arrays.asList(destination.getCapabilities())); setDestination(destination); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Fri Mar 31 13:38:45 2017 @@ -47,12 +47,12 @@ import org.apache.qpid.server.txn.LocalT import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint +public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator> { private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>(); private ArrayList<Transfer> _incompleteMessage; - public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link) + public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link) { super(session, link); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java Fri Mar 31 13:38:45 2017 @@ -23,7 +23,8 @@ import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.qpid.server.protocol.v1_0.LinkDefinition; -import org.apache.qpid.server.protocol.v1_0.LinkKey; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.store.StoreException; public abstract class AbstractLinkStore implements LinkStore @@ -31,14 +32,14 @@ public abstract class AbstractLinkStore private final ReentrantReadWriteLock _useOrCloseRWLock = new ReentrantReadWriteLock(true); private volatile StoreState _storeState = StoreState.CLOSED; - protected abstract Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater updater); + protected abstract Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final LinkStoreUpdater updater); protected abstract void doClose(); protected abstract void doDelete(); - protected abstract void doSaveLink(final LinkDefinition link); - protected abstract void doDeleteLink(final LinkDefinition link); + protected abstract void doSaveLink(final LinkDefinition<Source, Target> link); + protected abstract void doDeleteLink(final LinkDefinition<Source, Target> link); @Override - public final Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater updater) throws StoreException + public final Collection<LinkDefinition<Source, Target>> openAndLoad(final LinkStoreUpdater updater) throws StoreException { _useOrCloseRWLock.readLock().lock(); try @@ -48,7 +49,7 @@ public abstract class AbstractLinkStore throw new StoreException("Store is already opened"); } - Collection<LinkDefinition> linkDefinitions = doOpenAndLoad(updater); + Collection<LinkDefinition<Source, Target>> linkDefinitions = doOpenAndLoad(updater); _storeState = StoreState.OPENED; return linkDefinitions; } @@ -74,7 +75,7 @@ public abstract class AbstractLinkStore } @Override - public final void saveLink(final LinkDefinition link) throws StoreException + public final void saveLink(final LinkDefinition<Source, Target> link) throws StoreException { _useOrCloseRWLock.readLock().lock(); try @@ -93,7 +94,7 @@ public abstract class AbstractLinkStore } @Override - public final void deleteLink(final LinkDefinition link) throws StoreException + public final void deleteLink(final LinkDefinition<Source, Target> link) throws StoreException { _useOrCloseRWLock.readLock().lock(); try Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java Fri Mar 31 13:38:45 2017 @@ -22,19 +22,21 @@ package org.apache.qpid.server.protocol. import java.util.Collection; import org.apache.qpid.server.protocol.v1_0.LinkDefinition; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.store.StoreException; public interface LinkStore { - Collection<LinkDefinition> openAndLoad(LinkStoreUpdater updater) throws StoreException; + Collection<LinkDefinition<Source, Target>> openAndLoad(LinkStoreUpdater updater) throws StoreException; void close() throws StoreException; - void saveLink(LinkDefinition link) throws StoreException; + void saveLink(LinkDefinition<Source, Target> link) throws StoreException; - void deleteLink(LinkDefinition link) throws StoreException; + void deleteLink(LinkDefinition<Source, Target> link) throws StoreException; void delete(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java Fri Mar 31 13:38:45 2017 @@ -23,8 +23,10 @@ package org.apache.qpid.server.protocol. import java.util.Collection; import org.apache.qpid.server.protocol.v1_0.LinkDefinition; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; public interface LinkStoreUpdater { - Collection<LinkDefinition> update(String currentVersion, Collection<LinkDefinition> linkDefinitions); + Collection<LinkDefinition<Source, Target>> update(String currentVersion, Collection<LinkDefinition<Source, Target>> linkDefinitions); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java Fri Mar 31 13:38:45 2017 @@ -23,12 +23,14 @@ package org.apache.qpid.server.protocol. import java.util.Collection; import org.apache.qpid.server.protocol.v1_0.LinkDefinition; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; public class LinkStoreUpdaterImpl implements LinkStoreUpdater { @Override - public Collection<LinkDefinition> update(final String currentVersion, - final Collection<LinkDefinition> linkDefinitions) + public Collection<LinkDefinition<Source, Target>> update(final String currentVersion, + final Collection<LinkDefinition<Source, Target>> linkDefinitions) { return linkDefinitions; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java?rev=1789687&r1=1789686&r2=1789687&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java Fri Mar 31 13:38:45 2017 @@ -28,6 +28,8 @@ import org.apache.qpid.server.plugin.Plu import org.apache.qpid.server.protocol.v1_0.store.LinkStore; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory; import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; @@ -48,7 +50,7 @@ public class TestLinkStoreFactory implem return new LinkStore() { @Override - public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater updater) throws StoreException + public Collection<LinkDefinition<Source, Target>> openAndLoad(final LinkStoreUpdater updater) throws StoreException { return Collections.emptyList(); } @@ -60,13 +62,13 @@ public class TestLinkStoreFactory implem } @Override - public void saveLink(final LinkDefinition link) throws StoreException + public void saveLink(final LinkDefinition<Source, Target> link) throws StoreException { } @Override - public void deleteLink(final LinkDefinition link) throws StoreException + public void deleteLink(final LinkDefinition<Source, Target> link) throws StoreException { } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org