This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b35b6603b1 ARTEMIS-5519 Add bridge support for shared durable address
subscriptions
b35b6603b1 is described below
commit b35b6603b11430e7cd12ce437286506a60052dce
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Jun 25 08:59:27 2025 -0400
ARTEMIS-5519 Add bridge support for shared durable address subscriptions
Adds a configuration property to the bridge address receiver configuration
that
signals that shared durable subscriptions should be used if the remote
connection
indicates in the offered capabilities that it supports them. Falls back to
the
previous behavior if the remote doesn't support shared subs. Off by default
for
now to preserve previous subscriptions.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 2 +-
.../amqp/broker/ProtonProtocolManager.java | 16 +-
.../amqp/client/AMQPClientConnectionFactory.java | 2 +-
.../amqp/connect/AMQPBrokerConnection.java | 24 ++-
.../connect/bridge/AMQPBridgeConfiguration.java | 16 ++
.../amqp/connect/bridge/AMQPBridgeConstants.java | 18 ++
.../bridge/AMQPBridgeFromAddressReceiver.java | 23 +-
.../bridge/AMQPBridgeReceiverConfiguration.java | 15 ++
.../bridge/AMQPBridgeToAddressPolicyManager.java | 2 +-
.../bridge/AMQPBridgeToQueuePolicyManager.java | 8 +-
.../amqp/proton/AMQPConnectionContext.java | 74 +++++--
.../amqp/proton/handler/ProtonHandler.java | 5 +-
.../amqp/proton/AMQPConnectionContextTest.java | 1 +
.../amqp/connect/AMQPBridgeFromAddressTest.java | 235 +++++++++++++++++++++
.../amqp/connect/AMQPBridgeManagementTest.java | 2 +
.../amqp/connect/AMQPBridgeServerToServerTest.java | 113 ++++++++++
.../connect/AMQPFederationBrokerPliuginTest.java | 18 +-
17 files changed, 528 insertions(+), 46 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index eda30d804c..988ebdd5dc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -204,7 +204,7 @@ public class AMQPSessionCallback implements SessionCallback
{
String name = UUIDGenerator.getInstance().generateStringUUID();
- if (connection.isBridgeConnection()) {
+ if (connection.isBrokerConnection()) {
serverSession = manager.getServer().createInternalSession(name,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
protonSPI.getProtonConnectionDelegate(), // RemotingConnection
remotingConnection,
false, // boolean
autoCommitSends
false, // boolean
autoCommitAcks,
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 2b8f5ef09f..a4cedccc56 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -237,7 +237,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
Connection remotingConnection) {
- return internalConnectionEntry(remotingConnection, false, null, null);
+ return internalConnectionEntry(remotingConnection, false, null, null,
null);
}
/**
@@ -245,18 +245,22 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
* AMQP Bridges
*/
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection) {
- return internalConnectionEntry(remotingConnection, true, null, null);
+ return internalConnectionEntry(remotingConnection, true, null, null,
null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
null);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
null, null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, null);
}
- private ConnectionEntry internalConnectionEntry(Connection
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory,
Map<Symbol, Object> connectionProperties) {
+ public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties, Symbol[] desiredCapabilities) {
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, desiredCapabilities);
+ }
+
+ private ConnectionEntry internalConnectionEntry(Connection
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory,
Map<Symbol, Object> connectionProperties, Symbol[] desiredCapabilities) {
AMQPConnectionCallback connectionCallback = new
AMQPConnectionCallback(this, remotingConnection,
server.getExecutorFactory().getExecutor(), server);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
@@ -274,7 +278,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
String id = server.getNodeID().toString();
boolean useCoreSubscriptionNaming =
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this,
connectionCallback, id, (int) ttl, getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, saslFactory, connectionProperties, outgoing);
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this,
connectionCallback, id, (int) ttl, getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, saslFactory, connectionProperties,
desiredCapabilities, outgoing);
Executor executor = server.getExecutorFactory().getExecutor();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index af65937eec..cce7c9c82e 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -55,7 +55,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor();
- AMQPConnectionContext amqpConnection = new
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl,
protocolManager.getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
+ AMQPConnectionContext amqpConnection = new
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl,
protocolManager.getMaxFrameSize(),
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), false, clientSASLFactory, connectionProperties,
null);
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new
ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection,
executor);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index c3ff19c05f..ab026ceb29 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -119,6 +119,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnec
import static
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionConstants.CONNECTION_NAME;
import static
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionConstants.NODE_ID;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyCapabilities;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities;
@@ -501,8 +502,10 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
brokerConnectionProperties.put(BROKER_CONNECTION_INFO,
brokerConnectionInfo);
+ final Symbol[] brokerDesiredCapabilities = bridgeManagers == null ?
null : new Symbol[] {SHARED_SUBS};
+
NettyConnectorCloseHandler connectorCloseHandler = new
NettyConnectorCloseHandler(connector, connectExecutor);
- ConnectionEntry entry =
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory,
brokerConnectionProperties);
+ ConnectionEntry entry =
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory,
brokerConnectionProperties, brokerDesiredCapabilities);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection)
entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(),
this::linkClosed);
@@ -515,6 +518,20 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
sessionContext =
protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
protonRemotingConnection.getAmqpConnection().runLater(() -> {
+
protonRemotingConnection.getAmqpConnection().addRemoteOpenedListener(c -> {
+ try {
+ // Starting the Bridge triggers rebuild of AMQP sender and
receiver links based on current broker state.
+ // This requires in some cases knowing the remote offered
capabilities which can't be tested until after
+ // the remote sends its Open performative carrying those.
+ if (bridgeManagers != null) {
+ bridgeManagers.connectionRestored(sessionContext);
+ }
+ } catch (Throwable e) {
+ error(e);
+ } finally {
+ protonRemotingConnection.getAmqpConnection().flush();
+ }
+ });
protonRemotingConnection.getAmqpConnection().open();
session.open();
protonRemotingConnection.getAmqpConnection().flush();
@@ -562,11 +579,6 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
// Signal the Federation instance to start a rebuild of
federation links
// based on current broker state.
brokerFederation.connectionRestored(protonRemotingConnection.getAmqpConnection(),
sessionContext);
- } else if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.BRIDGE) {
- // Starting the Bridge triggers rebuild of AMQP sender and
receiver links based on current broker state.
- if (bridgeManagers != null) {
- bridgeManagers.connectionRestored(sessionContext);
- }
}
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
index a5cfebddc9..b646b37346 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
@@ -22,6 +22,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_INITIAL_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_PRIORITY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
@@ -46,6 +47,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_LINK_RECOVERY_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_LINK_RECOVERY_INITIAL_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_MAX_LINK_RECOVERY_ATTEMPTS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_DISABLE_RECEIVER_PRIORITY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_PULL_CREDIT_BATCH_SIZE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_QUEUE_RECEIVER_IDLE_TIMEOUT;
@@ -362,4 +364,18 @@ public class AMQPBridgeConfiguration {
return DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY;
}
}
+
+ /**
+ * {@return <code>true</code> if bridge from address policies are
configured to prefer using shared durable address subscriptions}
+ */
+ public boolean isPreferSharedDurableSubscriptions() {
+ final Object property =
properties.get(PREFER_SHARED_DURABLE_SUBSCRIPTIONS);
+ if (property instanceof Boolean) {
+ return (Boolean) property;
+ } else if (property instanceof String) {
+ return Boolean.parseBoolean((String) property);
+ } else {
+ return DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
+ }
+ }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
index 6fd19e292c..66fcbf84a0 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
@@ -135,6 +135,24 @@ public final class AMQPBridgeConstants {
*/
public static final boolean DEFAULT_DISABLE_RECEIVER_DEMAND_TRACKING =
false;
+ /**
+ * Configuration property that controls if a bridge from address receiver
that has been configured
+ * to use durable subscriptions will prefer to use a shared durable
subscription if the remote peer
+ * indicates in its connection capabilities that it supports them. Using a
shared subscription can
+ * avoid a rare edge case race on rapid attach and re-attach cycles due to
demand coming and going
+ * rapidly which leads to a stuck consumer. Enabling this on configurations
that previously did not
+ * have this option set to true could lead to orphaning a previous
subscription so care should be
+ * taken when changing the defaults for this option.
+ */
+ public static final String PREFER_SHARED_DURABLE_SUBSCRIPTIONS =
"preferSharedDurableSubscriptions";
+
+ /**
+ * Default value for whether a bridge from address receiver that has been
told to use durable subscriptions
+ * should prefer to use a shared durable address subscriptions or a
standard JMS style non-shared durable
+ * subscription.
+ */
+ public static final boolean DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS =
true;
+
/**
* Link Property added to the receiver properties when opening an AMQP
bridge address or queue consumer
* that indicates the consumer priority that should be used when creating
the remote consumer. The
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
index b4f96d06ec..a066c37046 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
@@ -19,6 +19,9 @@ package
org.apache.activemq.artemis.protocol.amqp.connect.bridge;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RECEIVER_PRIORITY;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyCapabilities;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyDesiredCapability;
import java.lang.invoke.MethodHandles;
@@ -80,6 +83,12 @@ public class AMQPBridgeFromAddressReceiver extends
AMQPBridgeReceiver {
return configuration.getAddressReceiverIdleTimeout();
}
+ private boolean isUseSharedDurableSubscriptions() {
+ return getPolicy().isUseDurableSubscriptions() &&
+ configuration.isPreferSharedDurableSubscriptions() &&
+
verifyCapabilities(session.getSession().getConnection().getRemoteOfferedCapabilities(),
SHARED_SUBS);
+ }
+
@Override
protected void doCreateReceiver() {
try {
@@ -97,6 +106,10 @@ public class AMQPBridgeFromAddressReceiver extends
AMQPBridgeReceiver {
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(AmqpSupport.COPY);
+
+ if (isUseSharedDurableSubscriptions()) {
+ source.setCapabilities(SHARED);
+ }
} else {
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
@@ -192,7 +205,15 @@ public class AMQPBridgeFromAddressReceiver extends
AMQPBridgeReceiver {
}
private String generateLinkName(AMQPBridgeAddressPolicy policy) {
- if (policy.isUseDurableSubscriptions()) {
+ if (isUseSharedDurableSubscriptions()) {
+ // Append the sequence ID with a '|' to create a shared durable
subscription for reconnects
+ // but a unique name to prevent stealing on rapid demand cycles.
+ return "amqp-bridge-" + bridgeManager.getName() +
+ "-policy-" + policy.getPolicyName() +
+ "-address-receiver-" + receiverInfo.getRemoteAddress() +
+ "-" + bridgeManager.getServer().getNodeID() +
+ "|" + LINK_SEQUENCE_ID.incrementAndGet();
+ } else if (policy.isUseDurableSubscriptions()) {
// Omit the sequence ID to create a stable durable subscription name
for reconnects.
return "amqp-bridge-" + bridgeManager.getName() +
"-policy-" + policy.getPolicyName() +
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
index 4928753d85..b7215f7c45 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
@@ -25,6 +25,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.QUEUE_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LARGE_MESSAGE_THRESHOLD;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_PRIORITY;
@@ -195,4 +196,18 @@ public final class AMQPBridgeReceiverConfiguration extends
AMQPBridgeLinkConfigu
return configuration.isReceiverDemandTrackingDisabled();
}
}
+
+ /**
+ * {@return <code>true</code> if bridge from address policies are
configured to prefer using shared durable address subscriptions}
+ */
+ public boolean isPreferSharedDurableSubscriptions() {
+ final Object property =
properties.get(PREFER_SHARED_DURABLE_SUBSCRIPTIONS);
+ if (property instanceof Boolean) {
+ return (Boolean) property;
+ } else if (property instanceof String) {
+ return Boolean.parseBoolean((String) property);
+ } else {
+ return configuration.isPreferSharedDurableSubscriptions();
+ }
+ }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
index 579847b7db..4c6696ae93 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
@@ -82,7 +82,7 @@ public class AMQPBridgeToAddressPolicyManager extends
AMQPBridgeToPolicyManager
@Override
public synchronized void afterAddAddress(AddressInfo addressInfo, boolean
reload) {
- if (isActive() && policy.test(addressInfo)) {
+ if (isActive() && policy.test(addressInfo) &&
!addressTracking.containsKey(addressInfo.getName().toString())) {
try {
final AMQPBridgeAddressSenderManager manager = new
AMQPBridgeAddressSenderManager(this, configuration, addressInfo);
addressTracking.put(manager.getAddress(), manager);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
index a54ed2a959..8bdc2352e3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
@@ -114,9 +114,11 @@ public class AMQPBridgeToQueuePolicyManager extends
AMQPBridgeToPolicyManager im
final AMQPBridgeQueueSenderManager manager;
final AMQPBridgeSenderInfo info = createSenderInfo(addressInfo,
queue);
- manager = new AMQPBridgeQueueSenderManager(this, configuration, info);
- queueSenders.put(info.getLocalFqqn(), manager);
- manager.start();
+ if (!queueSenders.containsKey(info.getLocalFqqn())) {
+ manager = new AMQPBridgeQueueSenderManager(this, configuration,
info);
+ queueSenders.put(info.getLocalFqqn(), manager);
+ manager.start();
+ }
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index bcd0a044df..d5b496c0d6 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -31,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import io.netty.buffer.ByteBuf;
@@ -60,6 +62,7 @@ import
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -118,8 +121,10 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
private final boolean isIncomingConnection;
private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
+ private final Symbol[] desiredCapabilities;
private final ScheduledExecutorService scheduledPool;
private final Map<String, LinkCloseListener> linkCloseListeners = new
ConcurrentHashMap<>();
+ private final Set<Consumer<AMQPConnectionContext>> remoteOpenedListeners =
new ConcurrentHashSet<>();
private final Map<Session, AMQPSessionContext> sessions = new
ConcurrentHashMap<>();
@@ -127,10 +132,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
private final boolean useCoreSubscriptionNaming;
- /**
- * Outgoing means created by the AMQP Bridge
- */
- private final boolean bridgeConnection;
+ private final boolean brokerConnection;
private final ScheduleOperator scheduleOp = new ScheduleOperator(new
ScheduleRunnable());
private final AtomicReference<Future<?>> scheduledFutureRef = new
AtomicReference<>(VOID_FUTURE);
@@ -149,8 +151,9 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
ScheduledExecutorService scheduledPool,
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
- Map<Symbol, Object> connectionProperties) {
- this(protocolManager, connectionSP, containerId, idleTimeout,
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool,
isIncomingConnection, saslClientFactory, connectionProperties, false);
+ Map<Symbol, Object> connectionProperties,
+ Symbol[] desiredCapabilities) {
+ this(protocolManager, connectionSP, containerId, idleTimeout,
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool,
isIncomingConnection, saslClientFactory, connectionProperties,
desiredCapabilities, false);
}
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@@ -164,9 +167,10 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties,
- boolean bridgeConnection) {
+ Symbol[] desiredCapabilities,
+ boolean brokerConnection) {
this.protocolManager = protocolManager;
- this.bridgeConnection = bridgeConnection;
+ this.brokerConnection = brokerConnection;
this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId :
UUID.randomUUID().toString();
@@ -180,6 +184,12 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
this.connectionProperties.putAll(connectionProperties);
}
+ if (desiredCapabilities != null && desiredCapabilities.length > 0) {
+ this.desiredCapabilities = Arrays.copyOf(desiredCapabilities,
desiredCapabilities.length);
+ } else {
+ this.desiredCapabilities = null;
+ }
+
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
EventLoop nettyExecutor =
connectionCallback.getTransportConnection().getEventLoop();
@@ -238,8 +248,8 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
linkCloseListeners.clear();
}
- public boolean isBridgeConnection() {
- return bridgeConnection;
+ public boolean isBrokerConnection() {
+ return brokerConnection;
}
public void requireInHandler() {
@@ -520,7 +530,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
}
public void open() {
- handler.open(containerId, connectionProperties);
+ handler.open(containerId, connectionProperties, desiredCapabilities);
}
public String getContainer() {
@@ -618,6 +628,20 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
return connectionCallback.getTransportConnection().getRemoteAddress();
}
+ public AMQPConnectionContext
addRemoteOpenedListener(Consumer<AMQPConnectionContext> listener) {
+ if (handler.getConnection() != null &&
EndpointState.ACTIVE.equals(handler.getConnection().getRemoteState())) {
+ try {
+ listener.accept(this);
+ } catch (Exception e) {
+ logger.debug("Caught exception from remote opened handler", e);
+ }
+ } else {
+ remoteOpenedListeners.add(listener);
+ }
+
+ return this;
+ }
+
@Override
public void onRemoteOpen(Connection connection) throws Exception {
handler.requireHandler();
@@ -632,10 +656,16 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);
- connection.setContainer(containerId);
- connection.setProperties(connectionProperties);
- connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
- connection.open();
+ // An outgoing connection would have already configured the
connection and opened or is in the
+ // processing of doing so when a simultaneous open occurred so we
protect against altering the
+ // sent values or preventing the in process open from configuring the
values it wants.
+ if (isIncomingConnection) {
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.setDesiredCapabilities(desiredCapabilities);
+ connection.open();
+ }
}
initialize();
@@ -651,6 +681,16 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
scheduledFutureRef.getAndUpdate(scheduleOp);
}
+
+ remoteOpenedListeners.forEach(consumer -> {
+ try {
+ consumer.accept(this);
+ } catch (Exception e) {
+ logger.debug("Caught exception from remote opened handler", e);
+ }
+ });
+
+ remoteOpenedListeners.clear();
}
}
@@ -667,7 +707,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
}
}
- if (isIncomingConnection() && saslClientFactory == null &&
!isBridgeConnection()) {
+ if (isIncomingConnection() && saslClientFactory == null &&
!isBrokerConnection()) {
try {
validatedUser = protocolManager.getServer().validateUser(user,
password, connectionCallback.getProtonConnectionDelegate(),
protocolManager.getSecurityDomain());
} catch (ActiveMQSecurityException e) {
@@ -790,7 +830,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
public void onLocalOpen(Session session) throws Exception {
AMQPSessionContext sessionContext = getSessionExtension(session);
- if (bridgeConnection) {
+ if (brokerConnection) {
sessionContext.initialize();
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index fb15a630ce..c77dd4534e 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -54,6 +54,7 @@ import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.lang.invoke.MethodHandles;
public class ProtonHandler extends ProtonInitializable implements SaslListener
{
@@ -629,11 +630,11 @@ public class ProtonHandler extends ProtonInitializable
implements SaslListener {
flush();
}
-
- public void open(String containerId, Map<Symbol, Object>
connectionProperties) {
+ public void open(String containerId, Map<Symbol, Object>
connectionProperties, Symbol[] desiredCapabilities) {
this.transport.open();
this.connection.setContainer(containerId);
this.connection.setProperties(connectionProperties);
+ this.connection.setDesiredCapabilities(desiredCapabilities);
this.connection.open();
flush();
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
index 983008f4f5..cf1e7f1fe2 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
@@ -76,6 +76,7 @@ public class AMQPConnectionContextTest {
scheduledPool,
false,
null,
+ null,
null);
connectionContext.onRemoteOpen(connection);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
index 2f0c75e80c..5d0bbdbbcc 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
@@ -20,6 +20,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.BRIDGE_RECEIVER_PRIORITY;
@@ -29,12 +30,14 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_INITIAL_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -3644,6 +3647,238 @@ class AMQPBridgeFromAddressTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testNewSharedDurableBridgeReceiverCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+
peer.expectOpen().respond().withOfferedCapabilities(SHARED_SUBS.toString());
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeAddressPolicyElement receiveFromAddress = new
AMQPBridgeAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.setUseDurableSubscriptions(true);
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+ element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AtomicReference<Attach> capturedAttach1 = new
AtomicReference<>();
+ final AtomicReference<Attach> capturedAttach2 = new
AtomicReference<>();
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach1.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .withCapabilities(SHARED.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond().afterDelay(50); // Defer the detach
response for a bit
+
+ server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()),
RoutingType.MULTICAST));
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ // Create demand on the address which creates a bridge receiver then
let it close which
+ // should shut down that bridge receiver. We removed the idle timeout
wait so that we
+ // send a detach almost immediately and then add demand again before
the remote has likely
+ // sent its detach response.
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach2.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .withCapabilities(SHARED.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+
+ // Create demand on the address which creates a bridge receiver again
quickly which
+ // can trigger a new receiver before the previous one was fully
closed with a Detach
+ // response and get stuck because it will steal the link in proton
and not be treated
+ // as a new attach for this consumer.
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Shared subs should be used and the sequence number is in the link
name means they are not equal
+ assertNotEquals(capturedAttach1.get().getName(),
capturedAttach2.get().getName());
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testDurableFallbackToLegacyTypeIfRemoteConnectionDoesNotOfferSharedSubs()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeAddressPolicyElement receiveFromAddress = new
AMQPBridgeAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.setUseDurableSubscriptions(true);
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+ element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AtomicReference<Attach> capturedAttach1 = new
AtomicReference<>();
+ final AtomicReference<Attach> capturedAttach2 = new
AtomicReference<>();
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach1.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+
+ server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()),
RoutingType.MULTICAST));
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ // Create demand which should trigger a durable subscription
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+ }
+
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of(getTestName())).isExists(), 5_000, 75);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach2.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+
+ // Add back demand which should trigger another durable subscription
with the same link name
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Non-shared subs should be used so no sequence number in the link
name means they will be the same
+ assertEquals(capturedAttach1.get().getName(),
capturedAttach2.get().getName());
+ }
+ }
+
public static class ApplicationPropertiesTransformer implements Transformer
{
private final Map<String, String> properties = new HashMap<>();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
index a86a7c9adf..e8da987e48 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
@@ -1006,6 +1006,8 @@ public class AMQPBridgeManagementTest extends
AmqpClientTestSupport {
final String policyResourceName =
AMQPBridgeManagementSupport.getBridgePolicyManagerResourceName(getTestName(),
getTestName(), "to-address-policy");
final String producerResourceName =
AMQPBridgeManagementSupport.getBridgeAddressSenderResourceName(getTestName(),
getTestName(), "to-address-policy", getTestName());
+ Wait.assertTrue(() ->
server.getManagementService().getResource(producerResourceName) != null, 5_000,
50);
+
final BrokerConnectionControl brokerConnection =
(BrokerConnectionControl)
server.getManagementService().getResource(brokerConnectionName);
final AMQPBridgeManagerControl bridgeControl =
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
index b274999936..ca32d671ac 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -990,6 +991,9 @@ class AMQPBridgeServerToServerTest extends
AmqpClientTestSupport {
Wait.assertTrue(() ->
remoteServer.addressQuery(SimpleString.of(getTestName())).isExists());
Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() == 1);
+ // Connection has been restored to the remote and the bridge created
a new binding to send to the remote
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() == 1, 5000, 50);
+
assertNull(consumerR.receiveNoWait());
message.setStringProperty("testProperty", "testValue-3");
@@ -1270,4 +1274,113 @@ class AMQPBridgeServerToServerTest extends
AmqpClientTestSupport {
assertEquals("red", receivedAfter.getStringProperty("color"));
}
}
+
+ @Test
+ @Timeout(20)
+ public void testSharedDurableAddressSubscriptionRecoveredOnRestart() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final String filterString = "color='red'";
+
+ final AMQPBridgeAddressPolicyElement bridgeAddressPolicy = new
AMQPBridgeAddressPolicyElement();
+ bridgeAddressPolicy.setName("test-policy");
+ bridgeAddressPolicy.setUseDurableSubscriptions(true);
+ bridgeAddressPolicy.setFilter(filterString);
+ bridgeAddressPolicy.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(bridgeAddressPolicy);
+ element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10); // Limit reconnects
+ amqpConnection.setRetryInterval(50);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ // Create an address with a binding to simulate demand from a consumer
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+ // Wait for the bridge to form to the remote and capture the durable
subscription name
+ Wait.assertEquals(1L, () ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size(), 500_000, 50);
+
+ // The actual subscription queue for the "shared" bridge receivers
should be a stable queue
+ final String subscriptionQueueName =
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(subscriptionQueueName);
+ assertTrue(subscriptionQueueName.contains("amqp-bridge-"));
+ assertTrue(subscriptionQueueName.contains(getTestName()));
+
+ final org.apache.activemq.artemis.core.server.Queue subscriptionQueue =
remoteServer.locateQueue(subscriptionQueueName);
+
+ assertNotNull(subscriptionQueue);
+ Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ assertTrue(subscriptionQueue.isDurable());
+ assertEquals(filterString,
subscriptionQueue.getFilter().getFilterString().toString());
+
+ server.stop();
+
+ Wait.assertEquals(1L, () ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size(), 5_000, 50);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connection = factoryRemote.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageProducer producerL = session.createProducer(topic);
+ final TextMessage message = session.createTextMessage("Hello World");
+
+ message.setStringProperty("color", "green");
+ producerL.send(message);
+ message.setStringProperty("color", "red");
+ producerL.send(message);
+
+ Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(),
5_000, 100);
+ }
+
+ server.start();
+
+ // Server should re-attach and recover the subscription and take the
message
+ Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000,
100);
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+ try (Connection connection = factoryLocal.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName() + "::" +
getTestName()); // Access our pre-created queue via FQQN
+ final MessageConsumer consumer = session.createConsumer(topic);
+
+ connection.start();
+
+ final Message receivedAfter = consumer.receive(5_000);
+
+ assertNotNull(receivedAfter);
+ assertTrue(receivedAfter instanceof TextMessage);
+ assertEquals("Hello World", ((TextMessage) receivedAfter).getText());
+ assertTrue(receivedAfter.propertyExists("color"));
+ assertEquals("red", receivedAfter.getStringProperty("color"));
+
+ final TextMessage message = session.createTextMessage("Hello Again");
+ final MessageProducer producerL = session.createProducer(topic);
+
+ message.setStringProperty("color", "red");
+ producerL.send(message);
+
+ final Message receiveAnother = consumer.receive(5_000);
+
+ assertNotNull(receiveAnother);
+ assertTrue(receiveAnother instanceof TextMessage);
+ assertEquals("Hello Again", ((TextMessage) receiveAnother).getText());
+ assertTrue(receiveAnother.propertyExists("color"));
+ assertEquals("red", receiveAnother.getStringProperty("color"));
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
index e0ceebe12e..bd522204db 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
@@ -150,10 +150,12 @@ public class AMQPFederationBrokerPliuginTest extends
AmqpClientTestSupport {
connectionR.start();
// Demand on local address should trigger receiver on remote.
- Wait.assertTrue(() ->
server.addressQuery(SimpleString.of("test")).isExists());
- Wait.assertTrue(() ->
remoteServer.addressQuery(SimpleString.of("test")).isExists());
- Wait.assertTrue(() ->
federationPlugin.beforeCreateConsumerCapture.get() != null);
- Wait.assertTrue(() ->
federationPlugin.afterCreateConsumerCapture.get() != null);
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of("test")).isExists(), 5000, 50);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() > 0, 5000,
50);
+ Wait.assertTrue(() ->
remoteServer.addressQuery(SimpleString.of("test")).isExists(), 5000, 50);
+ Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of("test")).getQueueNames().size() > 0,
5000, 50);
+ Wait.assertTrue(() ->
federationPlugin.beforeCreateConsumerCapture.get() != null, 5000, 50);
+ Wait.assertTrue(() ->
federationPlugin.afterCreateConsumerCapture.get() != null, 5000, 50);
final MessageProducer producerR = sessionR.createProducer(topic);
final TextMessage message = sessionR.createTextMessage("Hello World");
@@ -170,8 +172,8 @@ public class AMQPFederationBrokerPliuginTest extends
AmqpClientTestSupport {
producerR.send(message);
- Wait.assertTrue(() -> messagePreHandled.get() != null);
- Wait.assertTrue(() -> messagePostHandled.get() != null);
+ Wait.assertTrue(() -> messagePreHandled.get() != null, 5000, 50);
+ Wait.assertTrue(() -> messagePostHandled.get() != null, 5000, 50);
assertSame(messagePreHandled.get(), messagePostHandled.get());
@@ -179,8 +181,8 @@ public class AMQPFederationBrokerPliuginTest extends
AmqpClientTestSupport {
consumerL.close();
- Wait.assertTrue(() ->
federationPlugin.beforeCloseConsumerCapture.get() != null);
- Wait.assertTrue(() ->
federationPlugin.afterCloseConsumerCapture.get() != null);
+ Wait.assertTrue(() ->
federationPlugin.beforeCloseConsumerCapture.get() != null, 5000, 50);
+ Wait.assertTrue(() ->
federationPlugin.afterCloseConsumerCapture.get() != null, 5000, 50);
assertNotNull(received);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact