This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b35b6603b1 ARTEMIS-5519 Add bridge support for shared durable address 
subscriptions
b35b6603b1 is described below

commit b35b6603b11430e7cd12ce437286506a60052dce
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Jun 25 08:59:27 2025 -0400

    ARTEMIS-5519 Add bridge support for shared durable address subscriptions
    
    Adds a configuration property to the bridge address receiver configuration 
that
    signals that shared durable subscriptions should be used if the remote 
connection
    indicates in the offered capabilities that it supports them. Falls back to 
the
    previous behavior if the remote doesn't support shared subs. Off by default 
for
    now to preserve previous subscriptions.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   2 +-
 .../amqp/broker/ProtonProtocolManager.java         |  16 +-
 .../amqp/client/AMQPClientConnectionFactory.java   |   2 +-
 .../amqp/connect/AMQPBrokerConnection.java         |  24 ++-
 .../connect/bridge/AMQPBridgeConfiguration.java    |  16 ++
 .../amqp/connect/bridge/AMQPBridgeConstants.java   |  18 ++
 .../bridge/AMQPBridgeFromAddressReceiver.java      |  23 +-
 .../bridge/AMQPBridgeReceiverConfiguration.java    |  15 ++
 .../bridge/AMQPBridgeToAddressPolicyManager.java   |   2 +-
 .../bridge/AMQPBridgeToQueuePolicyManager.java     |   8 +-
 .../amqp/proton/AMQPConnectionContext.java         |  74 +++++--
 .../amqp/proton/handler/ProtonHandler.java         |   5 +-
 .../amqp/proton/AMQPConnectionContextTest.java     |   1 +
 .../amqp/connect/AMQPBridgeFromAddressTest.java    | 235 +++++++++++++++++++++
 .../amqp/connect/AMQPBridgeManagementTest.java     |   2 +
 .../amqp/connect/AMQPBridgeServerToServerTest.java | 113 ++++++++++
 .../connect/AMQPFederationBrokerPliuginTest.java   |  18 +-
 17 files changed, 528 insertions(+), 46 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index eda30d804c..988ebdd5dc 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -204,7 +204,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
 
       String name = UUIDGenerator.getInstance().generateStringUUID();
 
-      if (connection.isBridgeConnection())  {
+      if (connection.isBrokerConnection())  {
          serverSession = manager.getServer().createInternalSession(name, 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, 
protonSPI.getProtonConnectionDelegate(), // RemotingConnection 
remotingConnection,
                                                            false, // boolean 
autoCommitSends
                                                            false, // boolean 
autoCommitAcks,
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 2b8f5ef09f..a4cedccc56 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -237,7 +237,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection remotingConnection) {
-      return internalConnectionEntry(remotingConnection, false, null, null);
+      return internalConnectionEntry(remotingConnection, false, null, null, 
null);
    }
 
    /**
@@ -245,18 +245,22 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
     * AMQP Bridges
     */
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection) {
-      return internalConnectionEntry(remotingConnection, true, null, null);
+      return internalConnectionEntry(remotingConnection, true, null, null, 
null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null, null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, null);
    }
 
-   private ConnectionEntry internalConnectionEntry(Connection 
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, 
Map<Symbol, Object> connectionProperties) {
+   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties, Symbol[] desiredCapabilities) {
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, desiredCapabilities);
+   }
+
+   private ConnectionEntry internalConnectionEntry(Connection 
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, 
Map<Symbol, Object> connectionProperties, Symbol[] desiredCapabilities) {
       AMQPConnectionCallback connectionCallback = new 
AMQPConnectionCallback(this, remotingConnection, 
server.getExecutorFactory().getExecutor(), server);
       long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
 
@@ -274,7 +278,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
       String id = server.getNodeID().toString();
       boolean useCoreSubscriptionNaming = 
server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, 
connectionCallback, id, (int) ttl, getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), true, saslFactory, connectionProperties, outgoing);
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, 
connectionCallback, id, (int) ttl, getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), true, saslFactory, connectionProperties, 
desiredCapabilities, outgoing);
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index af65937eec..cce7c9c82e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -55,7 +55,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new 
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, 
protocolManager.getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
+      AMQPConnectionContext amqpConnection = new 
AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, 
protocolManager.getMaxFrameSize(), 
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, 
server.getScheduledPool(), false, clientSASLFactory, connectionProperties, 
null);
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new 
ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, 
executor);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index c3ff19c05f..ab026ceb29 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -119,6 +119,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnec
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionConstants.CONNECTION_NAME;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionConstants.NODE_ID;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyCapabilities;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities;
 
@@ -501,8 +502,10 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
          brokerConnectionProperties.put(BROKER_CONNECTION_INFO, 
brokerConnectionInfo);
 
+         final Symbol[] brokerDesiredCapabilities = bridgeManagers == null ? 
null : new Symbol[] {SHARED_SUBS};
+
          NettyConnectorCloseHandler connectorCloseHandler = new 
NettyConnectorCloseHandler(connector, connectExecutor);
-         ConnectionEntry entry = 
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory, 
brokerConnectionProperties);
+         ConnectionEntry entry = 
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory, 
brokerConnectionProperties, brokerDesiredCapabilities);
          server.getRemotingService().addConnectionEntry(connection, entry);
          protonRemotingConnection = (ActiveMQProtonRemotingConnection) 
entry.connection;
          
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(),
 this::linkClosed);
@@ -515,6 +518,20 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          sessionContext = 
protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
 
          protonRemotingConnection.getAmqpConnection().runLater(() -> {
+            
protonRemotingConnection.getAmqpConnection().addRemoteOpenedListener(c -> {
+               try {
+                  // Starting the Bridge triggers rebuild of AMQP sender and 
receiver links based on current broker state.
+                  // This requires in some cases knowing the remote offered 
capabilities which can't be tested until after
+                  // the remote sends its Open performative carrying those.
+                  if (bridgeManagers != null) {
+                     bridgeManagers.connectionRestored(sessionContext);
+                  }
+               } catch (Throwable e) {
+                  error(e);
+               } finally {
+                  protonRemotingConnection.getAmqpConnection().flush();
+               }
+            });
             protonRemotingConnection.getAmqpConnection().open();
             session.open();
             protonRemotingConnection.getAmqpConnection().flush();
@@ -562,11 +579,6 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
                   // Signal the Federation instance to start a rebuild of 
federation links
                   // based on current broker state.
                   
brokerFederation.connectionRestored(protonRemotingConnection.getAmqpConnection(),
 sessionContext);
-               } else if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.BRIDGE) {
-                  // Starting the Bridge triggers rebuild of AMQP sender and 
receiver links based on current broker state.
-                  if (bridgeManagers != null) {
-                     bridgeManagers.connectionRestored(sessionContext);
-                  }
                }
             }
          }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
index a5cfebddc9..b646b37346 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
@@ -22,6 +22,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_INITIAL_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_PRIORITY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
@@ -46,6 +47,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_LINK_RECOVERY_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_LINK_RECOVERY_INITIAL_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_MAX_LINK_RECOVERY_ATTEMPTS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_DISABLE_RECEIVER_PRIORITY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_PULL_CREDIT_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_QUEUE_RECEIVER_IDLE_TIMEOUT;
@@ -362,4 +364,18 @@ public class AMQPBridgeConfiguration {
          return DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY;
       }
    }
+
+   /**
+    * {@return <code>true</code> if bridge from address policies are 
configured to prefer using shared durable address subscriptions}
+    */
+   public boolean isPreferSharedDurableSubscriptions() {
+      final Object property = 
properties.get(PREFER_SHARED_DURABLE_SUBSCRIPTIONS);
+      if (property instanceof Boolean) {
+         return (Boolean) property;
+      } else if (property instanceof String) {
+         return Boolean.parseBoolean((String) property);
+      } else {
+         return DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
index 6fd19e292c..66fcbf84a0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
@@ -135,6 +135,24 @@ public final class AMQPBridgeConstants {
     */
    public static final boolean DEFAULT_DISABLE_RECEIVER_DEMAND_TRACKING = 
false;
 
+   /**
+    * Configuration property that controls if a bridge from address receiver 
that has been configured
+    * to use durable subscriptions will prefer to use a shared durable 
subscription if the remote peer
+    * indicates in its connection capabilities that it supports them. Using a 
shared subscription can
+    * avoid a rare edge case race on rapid attach and re-attach cycles due to 
demand coming and going
+    * rapidly which leads to a stuck consumer. Enabling this on configurations 
that previously did not
+    * have this option set to true could lead to orphaning a previous 
subscription so care should be
+    * taken when changing the defaults for this option.
+    */
+   public static final String PREFER_SHARED_DURABLE_SUBSCRIPTIONS = 
"preferSharedDurableSubscriptions";
+
+   /**
+    * Default value for whether a bridge from address receiver that has been 
told to use durable subscriptions
+    * should prefer to use a shared durable address subscriptions or a 
standard JMS style non-shared durable
+    * subscription.
+    */
+   public static final boolean DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS = 
true;
+
    /**
     * Link Property added to the receiver properties when opening an AMQP 
bridge address or queue consumer
     * that indicates the consumer priority that should be used when creating 
the remote consumer. The
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
index b4f96d06ec..a066c37046 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
@@ -19,6 +19,9 @@ package 
org.apache.activemq.artemis.protocol.amqp.connect.bridge;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RECEIVER_PRIORITY;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyCapabilities;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyDesiredCapability;
 
 import java.lang.invoke.MethodHandles;
@@ -80,6 +83,12 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
       return configuration.getAddressReceiverIdleTimeout();
    }
 
+   private boolean isUseSharedDurableSubscriptions() {
+      return getPolicy().isUseDurableSubscriptions() &&
+             configuration.isPreferSharedDurableSubscriptions() &&
+             
verifyCapabilities(session.getSession().getConnection().getRemoteOfferedCapabilities(),
 SHARED_SUBS);
+   }
+
    @Override
    protected void doCreateReceiver() {
       try {
@@ -97,6 +106,10 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDistributionMode(AmqpSupport.COPY);
+
+            if (isUseSharedDurableSubscriptions()) {
+               source.setCapabilities(SHARED);
+            }
          } else {
             source.setDurable(TerminusDurability.NONE);
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
@@ -192,7 +205,15 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
    }
 
    private String generateLinkName(AMQPBridgeAddressPolicy policy) {
-      if (policy.isUseDurableSubscriptions()) {
+      if (isUseSharedDurableSubscriptions()) {
+         // Append the sequence ID with a '|' to create a shared durable 
subscription for reconnects
+         // but a unique name to prevent stealing on rapid demand cycles.
+         return "amqp-bridge-" + bridgeManager.getName() +
+                "-policy-" + policy.getPolicyName() +
+                "-address-receiver-" + receiverInfo.getRemoteAddress() +
+                "-" + bridgeManager.getServer().getNodeID() +
+                "|" + LINK_SEQUENCE_ID.incrementAndGet();
+      } else if (policy.isUseDurableSubscriptions()) {
          // Omit the sequence ID to create a stable durable subscription name 
for reconnects.
          return "amqp-bridge-" + bridgeManager.getName() +
                 "-policy-" + policy.getPolicyName() +
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
index 4928753d85..b7215f7c45 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
@@ -25,6 +25,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.QUEUE_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LARGE_MESSAGE_THRESHOLD;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_PRIORITY;
@@ -195,4 +196,18 @@ public final class AMQPBridgeReceiverConfiguration extends 
AMQPBridgeLinkConfigu
          return configuration.isReceiverDemandTrackingDisabled();
       }
    }
+
+   /**
+    * {@return <code>true</code> if bridge from address policies are 
configured to prefer using shared durable address subscriptions}
+    */
+   public boolean isPreferSharedDurableSubscriptions() {
+      final Object property = 
properties.get(PREFER_SHARED_DURABLE_SUBSCRIPTIONS);
+      if (property instanceof Boolean) {
+         return (Boolean) property;
+      } else if (property instanceof String) {
+         return Boolean.parseBoolean((String) property);
+      } else {
+         return configuration.isPreferSharedDurableSubscriptions();
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
index 579847b7db..4c6696ae93 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToAddressPolicyManager.java
@@ -82,7 +82,7 @@ public class AMQPBridgeToAddressPolicyManager extends 
AMQPBridgeToPolicyManager
 
    @Override
    public synchronized void afterAddAddress(AddressInfo addressInfo, boolean 
reload) {
-      if (isActive() && policy.test(addressInfo)) {
+      if (isActive() && policy.test(addressInfo) && 
!addressTracking.containsKey(addressInfo.getName().toString())) {
          try {
             final AMQPBridgeAddressSenderManager manager = new 
AMQPBridgeAddressSenderManager(this, configuration, addressInfo);
             addressTracking.put(manager.getAddress(), manager);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
index a54ed2a959..8bdc2352e3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeToQueuePolicyManager.java
@@ -114,9 +114,11 @@ public class AMQPBridgeToQueuePolicyManager extends 
AMQPBridgeToPolicyManager im
          final AMQPBridgeQueueSenderManager manager;
          final AMQPBridgeSenderInfo info = createSenderInfo(addressInfo, 
queue);
 
-         manager = new AMQPBridgeQueueSenderManager(this, configuration, info);
-         queueSenders.put(info.getLocalFqqn(), manager);
-         manager.start();
+         if (!queueSenders.containsKey(info.getLocalFqqn())) {
+            manager = new AMQPBridgeQueueSenderManager(this, configuration, 
info);
+            queueSenders.put(info.getLocalFqqn(), manager);
+            manager.start();
+         }
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index bcd0a044df..d5b496c0d6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -31,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 
 import io.netty.buffer.ByteBuf;
@@ -60,6 +62,7 @@ import 
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -118,8 +121,10 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
    private final boolean isIncomingConnection;
    private final ClientSASLFactory saslClientFactory;
    private final Map<Symbol, Object> connectionProperties = new HashMap<>();
+   private final Symbol[] desiredCapabilities;
    private final ScheduledExecutorService scheduledPool;
    private final Map<String, LinkCloseListener> linkCloseListeners = new 
ConcurrentHashMap<>();
+   private final Set<Consumer<AMQPConnectionContext>> remoteOpenedListeners = 
new ConcurrentHashSet<>();
 
    private final Map<Session, AMQPSessionContext> sessions = new 
ConcurrentHashMap<>();
 
@@ -127,10 +132,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
 
    private final boolean useCoreSubscriptionNaming;
 
-   /**
-    * Outgoing means created by the AMQP Bridge
-    */
-   private final boolean bridgeConnection;
+   private final boolean brokerConnection;
 
    private final ScheduleOperator scheduleOp = new ScheduleOperator(new 
ScheduleRunnable());
    private final AtomicReference<Future<?>> scheduledFutureRef = new 
AtomicReference<>(VOID_FUTURE);
@@ -149,8 +151,9 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
                                 ScheduledExecutorService scheduledPool,
                                 boolean isIncomingConnection,
                                 ClientSASLFactory saslClientFactory,
-                                Map<Symbol, Object> connectionProperties) {
-      this(protocolManager, connectionSP, containerId, idleTimeout, 
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, 
isIncomingConnection, saslClientFactory, connectionProperties, false);
+                                Map<Symbol, Object> connectionProperties,
+                                Symbol[] desiredCapabilities) {
+      this(protocolManager, connectionSP, containerId, idleTimeout, 
maxFrameSize, channelMax, useCoreSubscriptionNaming, scheduledPool, 
isIncomingConnection, saslClientFactory, connectionProperties, 
desiredCapabilities, false);
    }
 
    public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@@ -164,9 +167,10 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
                                 boolean isIncomingConnection,
                                 ClientSASLFactory saslClientFactory,
                                 Map<Symbol, Object> connectionProperties,
-                                boolean bridgeConnection) {
+                                Symbol[] desiredCapabilities,
+                                boolean brokerConnection) {
       this.protocolManager = protocolManager;
-      this.bridgeConnection = bridgeConnection;
+      this.brokerConnection = brokerConnection;
       this.connectionCallback = connectionSP;
       this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
       this.containerId = (containerId != null) ? containerId : 
UUID.randomUUID().toString();
@@ -180,6 +184,12 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          this.connectionProperties.putAll(connectionProperties);
       }
 
+      if (desiredCapabilities != null && desiredCapabilities.length > 0) {
+         this.desiredCapabilities = Arrays.copyOf(desiredCapabilities, 
desiredCapabilities.length);
+      } else {
+         this.desiredCapabilities = null;
+      }
+
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
       EventLoop nettyExecutor = 
connectionCallback.getTransportConnection().getEventLoop();
@@ -238,8 +248,8 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
       linkCloseListeners.clear();
    }
 
-   public boolean isBridgeConnection() {
-      return bridgeConnection;
+   public boolean isBrokerConnection() {
+      return brokerConnection;
    }
 
    public void requireInHandler() {
@@ -520,7 +530,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
    }
 
    public void open() {
-      handler.open(containerId, connectionProperties);
+      handler.open(containerId, connectionProperties, desiredCapabilities);
    }
 
    public String getContainer() {
@@ -618,6 +628,20 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
       return connectionCallback.getTransportConnection().getRemoteAddress();
    }
 
+   public AMQPConnectionContext 
addRemoteOpenedListener(Consumer<AMQPConnectionContext> listener) {
+      if (handler.getConnection() != null && 
EndpointState.ACTIVE.equals(handler.getConnection().getRemoteState())) {
+         try {
+            listener.accept(this);
+         } catch (Exception e) {
+            logger.debug("Caught exception from remote opened handler", e);
+         }
+      } else {
+         remoteOpenedListeners.add(listener);
+      }
+
+      return this;
+   }
+
    @Override
    public void onRemoteOpen(Connection connection) throws Exception {
       handler.requireHandler();
@@ -632,10 +656,16 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          connection.close();
       } else {
          connection.setContext(AMQPConnectionContext.this);
-         connection.setContainer(containerId);
-         connection.setProperties(connectionProperties);
-         connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-         connection.open();
+         // An outgoing connection would have already configured the 
connection and opened or is in the
+         // processing of doing so when a simultaneous open occurred so we 
protect against altering the
+         // sent values or preventing the in process open from configuring the 
values it wants.
+         if (isIncomingConnection) {
+            connection.setContainer(containerId);
+            connection.setProperties(connectionProperties);
+            
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+            connection.setDesiredCapabilities(desiredCapabilities);
+            connection.open();
+         }
       }
       initialize();
 
@@ -651,6 +681,16 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
 
             scheduledFutureRef.getAndUpdate(scheduleOp);
          }
+
+         remoteOpenedListeners.forEach(consumer -> {
+            try {
+               consumer.accept(this);
+            } catch (Exception e) {
+               logger.debug("Caught exception from remote opened handler", e);
+            }
+         });
+
+         remoteOpenedListeners.clear();
       }
    }
 
@@ -667,7 +707,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          }
       }
 
-      if (isIncomingConnection() && saslClientFactory == null && 
!isBridgeConnection()) {
+      if (isIncomingConnection() && saslClientFactory == null && 
!isBrokerConnection()) {
          try {
             validatedUser = protocolManager.getServer().validateUser(user, 
password, connectionCallback.getProtonConnectionDelegate(), 
protocolManager.getSecurityDomain());
          } catch (ActiveMQSecurityException e) {
@@ -790,7 +830,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
    public void onLocalOpen(Session session) throws Exception {
       AMQPSessionContext sessionContext = getSessionExtension(session);
 
-      if (bridgeConnection) {
+      if (brokerConnection) {
          sessionContext.initialize();
       }
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index fb15a630ce..c77dd4534e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -54,6 +54,7 @@ import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.TransportInternal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.lang.invoke.MethodHandles;
 
 public class ProtonHandler extends ProtonInitializable implements SaslListener 
{
@@ -629,11 +630,11 @@ public class ProtonHandler extends ProtonInitializable 
implements SaslListener {
       flush();
    }
 
-
-   public void open(String containerId, Map<Symbol, Object> 
connectionProperties) {
+   public void open(String containerId, Map<Symbol, Object> 
connectionProperties, Symbol[] desiredCapabilities) {
       this.transport.open();
       this.connection.setContainer(containerId);
       this.connection.setProperties(connectionProperties);
+      this.connection.setDesiredCapabilities(desiredCapabilities);
       this.connection.open();
       flush();
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
index 983008f4f5..cf1e7f1fe2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java
@@ -76,6 +76,7 @@ public class AMQPConnectionContextTest {
          scheduledPool,
          false,
          null,
+         null,
          null);
 
       connectionContext.onRemoteOpen(connection);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
index 2f0c75e80c..5d0bbdbbcc 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
@@ -20,6 +20,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED_SUBS;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.BRIDGE_RECEIVER_PRIORITY;
@@ -29,12 +30,14 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_INITIAL_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -3644,6 +3647,238 @@ class AMQPBridgeFromAddressTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testNewSharedDurableBridgeReceiverCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         
peer.expectOpen().respond().withOfferedCapabilities(SHARED_SUBS.toString());
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeAddressPolicyElement receiveFromAddress = new 
AMQPBridgeAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.setUseDurableSubscriptions(true);
+         receiveFromAddress.addToIncludes(getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromAddressPolicy(receiveFromAddress);
+         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+         element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AtomicReference<Attach> capturedAttach1 = new 
AtomicReference<>();
+         final AtomicReference<Attach> capturedAttach2 = new 
AtomicReference<>();
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach1.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .withCapabilities(SHARED.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+         peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                          .respond()
+                          
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+         peer.expectDetach().respond().afterDelay(50); // Defer the detach 
response for a bit
+
+         server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), 
RoutingType.MULTICAST));
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         // Create demand on the address which creates a bridge receiver then 
let it close which
+         // should shut down that bridge receiver. We removed the idle timeout 
wait so that we
+         // send a detach almost immediately and then add demand again before 
the remote has likely
+         // sent its detach response.
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach2.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .withCapabilities(SHARED.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+         peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                          .respond()
+                          
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+         peer.expectDetach().respond();
+
+         // Create demand on the address which creates a bridge receiver again 
quickly which
+         // can trigger a new receiver before the previous one was fully 
closed with a Detach
+         // response and get stuck because it will steal the link in proton 
and not be treated
+         // as a new attach for this consumer.
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Shared subs should be used and the sequence number is in the link 
name means they are not equal
+         assertNotEquals(capturedAttach1.get().getName(), 
capturedAttach2.get().getName());
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testDurableFallbackToLegacyTypeIfRemoteConnectionDoesNotOfferSharedSubs() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeAddressPolicyElement receiveFromAddress = new 
AMQPBridgeAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.setUseDurableSubscriptions(true);
+         receiveFromAddress.addToIncludes(getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromAddressPolicy(receiveFromAddress);
+         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+         element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AtomicReference<Attach> capturedAttach1 = new 
AtomicReference<>();
+         final AtomicReference<Attach> capturedAttach2 = new 
AtomicReference<>();
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach1.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+
+         server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), 
RoutingType.MULTICAST));
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         // Create demand which should trigger a durable subscription
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDetach().respond();
+         }
+
+         Wait.assertTrue(() -> 
server.addressQuery(SimpleString.of(getTestName())).isExists(), 5_000, 75);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach2.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+
+         // Add back demand which should trigger another durable subscription 
with the same link name
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDetach().respond();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Non-shared subs should be used so no sequence number in the link 
name means they will be the same
+         assertEquals(capturedAttach1.get().getName(), 
capturedAttach2.get().getName());
+      }
+   }
+
    public static class ApplicationPropertiesTransformer implements Transformer 
{
 
       private final Map<String, String> properties = new HashMap<>();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
index a86a7c9adf..e8da987e48 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeManagementTest.java
@@ -1006,6 +1006,8 @@ public class AMQPBridgeManagementTest extends 
AmqpClientTestSupport {
          final String policyResourceName = 
AMQPBridgeManagementSupport.getBridgePolicyManagerResourceName(getTestName(), 
getTestName(), "to-address-policy");
          final String producerResourceName = 
AMQPBridgeManagementSupport.getBridgeAddressSenderResourceName(getTestName(), 
getTestName(), "to-address-policy", getTestName());
 
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(producerResourceName) != null, 5_000, 
50);
+
          final BrokerConnectionControl brokerConnection = 
(BrokerConnectionControl)
             server.getManagementService().getResource(brokerConnectionName);
          final AMQPBridgeManagerControl bridgeControl =
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
index b274999936..ca32d671ac 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp.connect;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -990,6 +991,9 @@ class AMQPBridgeServerToServerTest extends 
AmqpClientTestSupport {
          Wait.assertTrue(() -> 
remoteServer.addressQuery(SimpleString.of(getTestName())).isExists());
          Wait.assertTrue(() -> 
remoteServer.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size() == 1);
 
+         // Connection has been restored to the remote and the bridge created 
a new binding to send to the remote
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size() == 1, 5000, 50);
+
          assertNull(consumerR.receiveNoWait());
 
          message.setStringProperty("testProperty", "testValue-3");
@@ -1270,4 +1274,113 @@ class AMQPBridgeServerToServerTest extends 
AmqpClientTestSupport {
          assertEquals("red", receivedAfter.getStringProperty("color"));
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void testSharedDurableAddressSubscriptionRecoveredOnRestart() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final String filterString = "color='red'";
+
+      final AMQPBridgeAddressPolicyElement bridgeAddressPolicy = new 
AMQPBridgeAddressPolicyElement();
+      bridgeAddressPolicy.setName("test-policy");
+      bridgeAddressPolicy.setUseDurableSubscriptions(true);
+      bridgeAddressPolicy.setFilter(filterString);
+      bridgeAddressPolicy.addToIncludes(getTestName());
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addBridgeFromAddressPolicy(bridgeAddressPolicy);
+      element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      // Create an address with a binding to simulate demand from a consumer
+      
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(getTestName())
+                                                             
.setAutoCreated(false));
+      // Wait for the bridge to form to the remote and capture the durable 
subscription name
+      Wait.assertEquals(1L, () -> 
remoteServer.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size(), 500_000, 50);
+
+      // The actual subscription queue for the "shared" bridge receivers 
should be a stable queue
+      final String subscriptionQueueName = 
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(subscriptionQueueName);
+      assertTrue(subscriptionQueueName.contains("amqp-bridge-"));
+      assertTrue(subscriptionQueueName.contains(getTestName()));
+
+      final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = 
remoteServer.locateQueue(subscriptionQueueName);
+
+      assertNotNull(subscriptionQueue);
+      Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      assertTrue(subscriptionQueue.isDurable());
+      assertEquals(filterString, 
subscriptionQueue.getFilter().getFilterString().toString());
+
+      server.stop();
+
+      Wait.assertEquals(1L, () -> 
remoteServer.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size(), 5_000, 50);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connection = factoryRemote.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageProducer producerL = session.createProducer(topic);
+         final TextMessage message = session.createTextMessage("Hello World");
+
+         message.setStringProperty("color", "green");
+         producerL.send(message);
+         message.setStringProperty("color", "red");
+         producerL.send(message);
+
+         Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(), 
5_000, 100);
+      }
+
+      server.start();
+
+      // Server should re-attach and recover the subscription and take the 
message
+      Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 
100);
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName() + "::" + 
getTestName()); // Access our pre-created queue via FQQN
+         final MessageConsumer consumer = session.createConsumer(topic);
+
+         connection.start();
+
+         final Message receivedAfter = consumer.receive(5_000);
+
+         assertNotNull(receivedAfter);
+         assertTrue(receivedAfter instanceof TextMessage);
+         assertEquals("Hello World", ((TextMessage) receivedAfter).getText());
+         assertTrue(receivedAfter.propertyExists("color"));
+         assertEquals("red", receivedAfter.getStringProperty("color"));
+
+         final TextMessage message = session.createTextMessage("Hello Again");
+         final MessageProducer producerL = session.createProducer(topic);
+
+         message.setStringProperty("color", "red");
+         producerL.send(message);
+
+         final Message receiveAnother = consumer.receive(5_000);
+
+         assertNotNull(receiveAnother);
+         assertTrue(receiveAnother instanceof TextMessage);
+         assertEquals("Hello Again", ((TextMessage) receiveAnother).getText());
+         assertTrue(receiveAnother.propertyExists("color"));
+         assertEquals("red", receiveAnother.getStringProperty("color"));
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
index e0ceebe12e..bd522204db 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.java
@@ -150,10 +150,12 @@ public class AMQPFederationBrokerPliuginTest extends 
AmqpClientTestSupport {
          connectionR.start();
 
          // Demand on local address should trigger receiver on remote.
-         Wait.assertTrue(() -> 
server.addressQuery(SimpleString.of("test")).isExists());
-         Wait.assertTrue(() -> 
remoteServer.addressQuery(SimpleString.of("test")).isExists());
-         Wait.assertTrue(() -> 
federationPlugin.beforeCreateConsumerCapture.get() != null);
-         Wait.assertTrue(() -> 
federationPlugin.afterCreateConsumerCapture.get() != null);
+         Wait.assertTrue(() -> 
server.addressQuery(SimpleString.of("test")).isExists(), 5000, 50);
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() > 0, 5000, 
50);
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(SimpleString.of("test")).isExists(), 5000, 50);
+         Wait.assertTrue(() -> 
remoteServer.bindingQuery(SimpleString.of("test")).getQueueNames().size() > 0, 
5000, 50);
+         Wait.assertTrue(() -> 
federationPlugin.beforeCreateConsumerCapture.get() != null, 5000, 50);
+         Wait.assertTrue(() -> 
federationPlugin.afterCreateConsumerCapture.get() != null, 5000, 50);
 
          final MessageProducer producerR = sessionR.createProducer(topic);
          final TextMessage message = sessionR.createTextMessage("Hello World");
@@ -170,8 +172,8 @@ public class AMQPFederationBrokerPliuginTest extends 
AmqpClientTestSupport {
 
          producerR.send(message);
 
-         Wait.assertTrue(() -> messagePreHandled.get() != null);
-         Wait.assertTrue(() -> messagePostHandled.get() != null);
+         Wait.assertTrue(() -> messagePreHandled.get() != null, 5000, 50);
+         Wait.assertTrue(() -> messagePostHandled.get() != null, 5000, 50);
 
          assertSame(messagePreHandled.get(), messagePostHandled.get());
 
@@ -179,8 +181,8 @@ public class AMQPFederationBrokerPliuginTest extends 
AmqpClientTestSupport {
 
          consumerL.close();
 
-         Wait.assertTrue(() -> 
federationPlugin.beforeCloseConsumerCapture.get() != null);
-         Wait.assertTrue(() -> 
federationPlugin.afterCloseConsumerCapture.get() != null);
+         Wait.assertTrue(() -> 
federationPlugin.beforeCloseConsumerCapture.get() != null, 5000, 50);
+         Wait.assertTrue(() -> 
federationPlugin.afterCloseConsumerCapture.get() != null, 5000, 50);
 
          assertNotNull(received);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to