This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 19d8059a4e ARTEMIS-4794 configure pending ack behavior for bridge
19d8059a4e is described below

commit 19d8059a4eca5b7296ec9711a439ea3a129242df
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Tue Jun 25 22:52:19 2024 -0500

    ARTEMIS-4794 configure pending ack behavior for bridge
    
    When a bridge is stopped it doesn't wait for pending send
    acknowledgements to arrive. However, when a bridge is paused it does
    wait. The behavior should be consistent and more importantly
    configurable. This commit implements these improvements and generally
    refactors BridgeImpl to clarify and simplify the code. In total, this
    commit includes the follow changes:
    
     - Removes the hard-coded 60-second timeout for pending acks when
       pausing the bridge and adds a new config parameter (i.e.
       "pending-ack-timeout").
     - Applies the new pending-ack-timeout when the bridge is stopped.
     - Updates existing and adds new logging messages for clarity.
     - De-duplicates code for sending bridge-related notifications.
     - Avoids converting bridge name to/from SimpleString.
     - Removes unnecessary comments.
     - Renames variables & functions for clarity.
     - Replaces the `started`, `stopping`, & `active` booleans with a
       single `state` variable which is an enum.
     - Adds `final` to a few variables that were functionally final.
     - Synchronizes `stop` & `pause` methods to add safety when invoked
       concurrently with `handle` (since both deal with `state` and execute
       runnables on the ordered executor).
     - Reorganizes and removes a few methods for clarity.
     - Relocates `connect` method directly into `ConnectRunnable` (mirroring
       the structure of the `StopRunnable` and `PauseRunnable`).
     - Eliminates unnecessary variables in `ConnectRunnable` and
       `ScheduledConnectRunnable`.
     - Adds test to verify pending ack timeout works as expected with both
       `stop` & `pause` with both regular and large messages.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 +
 .../artemis/core/config/BridgeConfiguration.java   |  30 ++
 .../deployers/impl/FileConfigurationParser.java    |   5 +-
 .../artemis/core/server/ActiveMQMessageBundle.java |   4 +
 .../artemis/core/server/ActiveMQServerLogger.java  |  24 +-
 .../core/server/cluster/ClusterManager.java        |  10 +-
 .../core/server/cluster/impl/BridgeImpl.java       | 550 ++++++++++-----------
 .../management/impl/ManagementServiceImpl.java     |   2 +-
 .../resources/schema/artemis-configuration.xsd     |   8 +
 .../core/config/BridgeConfigurationTest.java       |   3 +
 .../core/config/impl/FileConfigurationTest.java    |   2 +-
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 .../ConfigurationTest-xinclude-config.xml          |   1 +
 ...gurationTest-xinclude-schema-config-bridges.xml |   1 +
 .../integration/cluster/bridge/BridgeTest.java     | 213 ++++++++
 .../BridgeConfigurationStorageTest.java            |   2 +
 16 files changed, 555 insertions(+), 308 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 3b580f40b6..2813154014 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -664,6 +664,9 @@ public final class ActiveMQDefaultConfiguration {
    // Number of concurrent workers for a core bridge
    public static int DEFAULT_BRIDGE_CONCURRENCY = 1;
 
+   // How long to wait for acknowledgements to arrive from the bridge's target 
while stopping or pausing the bridge
+   public static long DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT = 60000;
+
    // Whether or not to report Netty pool metrics
    private static final boolean DEFAULT_NETTY_POOL_METRICS = false;
 
@@ -1860,6 +1863,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_BRIDGE_CONCURRENCY;
    }
 
+   public static long getDefaultBridgePendingAckTimeout() {
+      return DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT;
+   }
+
    /**
     * Whether or not to report Netty pool metrics
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index 7cfb861d70..f4e3a4e0af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -66,6 +66,7 @@ public final class BridgeConfiguration implements 
Serializable {
    public static String ROUTING_TYPE = "routing-type";
    public static String CONCURRENCY = "concurrency";
    public static String CONFIGURATION_MANAGED = "configuration-managed";
+   public static String PENDING_ACK_TIMEOUT = "pending-ack-timeout";
 
    private String name = null;
 
@@ -120,6 +121,8 @@ public final class BridgeConfiguration implements 
Serializable {
 
    private int concurrency = 
ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency();
 
+   private long pendingAckTimeout = 
ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout();
+
    private String parentName = null;
 
    private boolean configurationManaged = true;
@@ -155,6 +158,7 @@ public final class BridgeConfiguration implements 
Serializable {
       routingType = other.routingType;
       concurrency = other.concurrency;
       configurationManaged = other.configurationManaged;
+      pendingAckTimeout = other.pendingAckTimeout;
    }
 
    public BridgeConfiguration(String name) {
@@ -261,6 +265,8 @@ public final class BridgeConfiguration implements 
Serializable {
             setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
          } else if (key.equals(CONCURRENCY)) {
             setConcurrency(Integer.parseInt(value));
+         } else if (key.equals(PENDING_ACK_TIMEOUT)) {
+            setPendingAckTimeout(Long.parseLong(value));
          }
       }
       return this;
@@ -570,6 +576,21 @@ public final class BridgeConfiguration implements 
Serializable {
       return this;
    }
 
+   /**
+    * @return the bridge pending ack timeout
+    */
+   public long getPendingAckTimeout() {
+      return pendingAckTimeout;
+   }
+
+   /**
+    * @param pendingAckTimeout the bridge pending ack timeout to set
+    */
+   public BridgeConfiguration setPendingAckTimeout(long pendingAckTimeout) {
+      this.pendingAckTimeout = pendingAckTimeout;
+      return this;
+   }
+
    /**
     * At this point this is only changed on testcases
     * The bridge shouldn't be sending blocking anyways
@@ -631,6 +652,7 @@ public final class BridgeConfiguration implements 
Serializable {
       builder.add(CALL_TIMEOUT, getCallTimeout());
       builder.add(CONCURRENCY, getConcurrency());
       builder.add(CONFIGURATION_MANAGED, isConfigurationManaged());
+      builder.add(PENDING_ACK_TIMEOUT, getPendingAckTimeout());
 
       // complex fields (only serialize if value is not null)
 
@@ -725,6 +747,7 @@ public final class BridgeConfiguration implements 
Serializable {
       result = prime * result + (useDuplicateDetection ? 1231 : 1237);
       result = prime * result + ((user == null) ? 0 : user.hashCode());
       result = prime * result + concurrency;
+      result = prime * result + (int) (pendingAckTimeout ^ (pendingAckTimeout 
>>> 32));
       result = prime * result + (configurationManaged ? 1231 : 1237);
       return result;
    }
@@ -811,6 +834,8 @@ public final class BridgeConfiguration implements 
Serializable {
          return false;
       if (concurrency != other.concurrency)
          return false;
+      if (pendingAckTimeout != other.pendingAckTimeout)
+         return false;
       if (configurationManaged != other.configurationManaged)
          return false;
       return true;
@@ -857,6 +882,7 @@ public final class BridgeConfiguration implements 
Serializable {
          BufferHelper.sizeOfNullableInteger(minLargeMessageSize) +
          BufferHelper.sizeOfNullableLong(callTimeout) +
          BufferHelper.sizeOfNullableInteger(concurrency) +
+         BufferHelper.sizeOfNullableLong(pendingAckTimeout) +
          BufferHelper.sizeOfNullableBoolean(configurationManaged) +
          DataConstants.SIZE_BYTE +
          transformerSize +
@@ -909,6 +935,7 @@ public final class BridgeConfiguration implements 
Serializable {
       } else {
          buffer.writeInt(0);
       }
+      buffer.writeNullableLong(pendingAckTimeout);
    }
 
    public void decode(ActiveMQBuffer buffer) {
@@ -952,6 +979,9 @@ public final class BridgeConfiguration implements 
Serializable {
             staticConnectors.add(buffer.readNullableString());
          }
       }
+      if (buffer.readable()) {
+         pendingAckTimeout = buffer.readNullableLong();
+      }
    }
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c423be3e8a..e12dc13fce 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2475,6 +2475,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       int concurrency = getInteger(brNode, "concurrency", 
ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), GT_ZERO);
 
+      long pendingAckTimeout = getLong(brNode, "pending-ack-timeout", 
ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout(), GT_ZERO);
+
       NodeList clusterPassNodes = brNode.getElementsByTagName("password");
       String password = null;
 
@@ -2541,7 +2543,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          .setUser(user)
          .setPassword(password)
          .setRoutingType(routingType)
-         .setConcurrency(concurrency);
+         .setConcurrency(concurrency)
+         .setPendingAckTimeout(pendingAckTimeout);
 
       if (!staticConnectorNames.isEmpty()) {
          config.setStaticConnectors(staticConnectorNames);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 2cbeb11a76..0064bac1d1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.logs.annotation.LogBundle;
 import org.apache.activemq.artemis.logs.annotation.Message;
 import org.apache.activemq.artemis.logs.BundleFactory;
@@ -555,4 +556,7 @@ public interface ActiveMQMessageBundle {
    @Message(id = 229254, value = "Already replicating, started={}")
    ActiveMQIllegalStateException alreadyReplicating(boolean status);
 
+   @Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
+   ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String 
bridgeName, String failedOp, BridgeImpl.State currentState);
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index b9fba03ffe..a29ad5f8df 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -30,6 +30,7 @@ import io.netty.channel.Channel;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -152,13 +153,13 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 221027, value = "Bridge {} is connected", level = 
LogMessage.Level.INFO)
    void bridgeConnected(BridgeImpl name);
 
-   @LogMessage(id = 221028, value = "Bridge is stopping, will not retry", 
level = LogMessage.Level.INFO)
-   void bridgeStopping();
+   @LogMessage(id = 221028, value = "Bridge is {}, will not retry", level = 
LogMessage.Level.INFO)
+   void bridgeWillNotRetry(String operation);
 
-   @LogMessage(id = 221029, value = "stopped bridge {}", level = 
LogMessage.Level.INFO)
+   @LogMessage(id = 221029, value = "Stopped bridge {}", level = 
LogMessage.Level.INFO)
    void bridgeStopped(String name);
 
-   @LogMessage(id = 221030, value = "paused bridge {}", level = 
LogMessage.Level.INFO)
+   @LogMessage(id = 221030, value = "Paused bridge {}", level = 
LogMessage.Level.INFO)
    void bridgePaused(String name);
 
    @LogMessage(id = 221031, value = "backup announced", level = 
LogMessage.Level.INFO)
@@ -197,8 +198,8 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 221041, value = "Cannot find queue {} while reloading 
PAGE_CURSOR_COMPLETE, deleting record now", level = LogMessage.Level.INFO)
    void cantFindQueueOnPageComplete(long queueID);
 
-   @LogMessage(id = 221042, value = "Bridge {} timed out waiting for the 
completion of {} messages, we will just shutdown the bridge after 10 seconds 
wait", level = LogMessage.Level.INFO)
-   void timedOutWaitingCompletions(String bridgeName, long numberOfMessages);
+   @LogMessage(id = 221042, value = "{} bridge {} timed out waiting for the 
send acknowledgement of {} messages. Messages may be duplicated between the 
bridge's source and the target.", level = LogMessage.Level.INFO)
+   void timedOutWaitingForSendAcks(String operation, String bridgeName, long 
numberOfMessages);
 
    @LogMessage(id = 221043, value = "Protocol module found: [{}]. Adding 
protocol support for: {}", level = LogMessage.Level.INFO)
    void addingProtocolSupport(String moduleName, String protocolKey);
@@ -789,8 +790,8 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222159, value = "unable to send notification when 
broadcast group is stopped", level = LogMessage.Level.WARN)
    void broadcastBridgeStoppedError(Exception e);
 
-   @LogMessage(id = 222160, value = "unable to send notification when 
broadcast group is stopped", level = LogMessage.Level.WARN)
-   void notificationBridgeStoppedError(Exception e);
+   @LogMessage(id = 222160, value = "unable to send notification for bridge 
{}: {}", level = LogMessage.Level.WARN)
+   void notificationBridgeError(String bridge, CoreNotificationType type, 
Exception e);
 
    @LogMessage(id = 222161, value = "Group Handler timed-out waiting for 
sendCondition", level = LogMessage.Level.WARN)
    void groupHandlerSendTimeout();
@@ -1302,8 +1303,8 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 224030, value = "Could not cancel reference {}", level = 
LogMessage.Level.ERROR)
    void errorCancellingRefOnBridge(MessageReference ref2, Exception e);
 
-   @LogMessage(id = 224032, value = "Failed to pause bridge", level = 
LogMessage.Level.ERROR)
-   void errorPausingBridge(Exception e);
+   @LogMessage(id = 224032, value = "Failed to pause bridge: {}", level = 
LogMessage.Level.ERROR)
+   void errorPausingBridge(String bridgeName, Exception e);
 
    @LogMessage(id = 224033, value = "Failed to broadcast connector configs", 
level = LogMessage.Level.ERROR)
    void errorBroadcastingConnectorConfigs(Exception e);
@@ -1617,4 +1618,7 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on 
namespace {}", level = LogMessage.Level.WARN)
    void errorRegisteringDuplicateCacheSize(String address, Exception reason);
+
+   @LogMessage(id = 224139, value = "Failed to stop bridge: {}", level = 
LogMessage.Level.ERROR)
+   void errorStoppingBridge(String bridgeName, Exception e);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 68bf794c65..4cabb0abb4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -324,7 +324,7 @@ public class ClusterManager implements ActiveMQComponent {
 
          for (Bridge bridge : bridges.values()) {
             bridge.stop();
-            managementService.unregisterBridge(bridge.getName().toString());
+            
managementService.unregisterBridge(bridge.getConfiguration().getName());
          }
 
          bridges.clear();
@@ -532,17 +532,17 @@ public class ClusterManager implements ActiveMQComponent {
 
       synchronized (this) {
          for (Bridge bridge : bridges.values()) {
-            if (bridge.getName().toString().matches(name + "|" + name + 
"-\\d+")) {
-               bridge = bridges.get(bridge.getName().toString());
+            if (bridge.getConfiguration().getName().matches(name + "|" + name 
+ "-\\d+")) {
+               bridge = bridges.get(bridge.getConfiguration().getName());
                if (bridge != null) {
                   bridgesToRemove.add(bridge);
                }
             }
          }
          for (Bridge bridgeToRemove : bridgesToRemove) {
-            bridges.remove(bridgeToRemove.getName().toString());
+            bridges.remove(bridgeToRemove.getConfiguration().getName());
             bridgeToRemove.stop();
-            
managementService.unregisterBridge(bridgeToRemove.getName().toString());
+            
managementService.unregisterBridge(bridgeToRemove.getConfiguration().getName());
          }
       }
       for (Bridge bridge : bridgesToRemove) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index f63944a141..71b09e4310 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.cluster.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -28,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -47,13 +49,13 @@ import 
org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.HandleStatus;
@@ -74,15 +76,13 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
-
-/**
- * A Core BridgeImpl
- */
 
 public class BridgeImpl implements Bridge, SessionFailureListener, 
SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
 
+   public enum State {
+      STARTING, STARTED, PAUSING, PAUSED, STOPPING, STOPPED
+   }
+
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
    protected final ServerLocatorInternal serverLocator;
@@ -109,17 +109,13 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private boolean blockedOnFlowControl;
 
-   /**
-    * Used when there's a scheduled reconnection
-    */
-   protected ScheduledFuture<?> futureScheduledReconnection;
+   protected ScheduledFuture<?> scheduledReconnection;
 
    protected volatile ClientSessionInternal session;
 
    // on cases where sub-classes need a consumer
    protected volatile ClientSessionInternal sessionConsumer;
 
-
    // this will happen if a disconnect happened
    // upon reconnection we need to send the nodeUP back into the topology
    protected volatile boolean disconnectedAndDown = false;
@@ -132,11 +128,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private volatile ClientProducer producer;
 
-   private volatile boolean started;
-
-   private volatile boolean stopping = false;
-
-   private volatile boolean active;
+   private volatile State state = State.STOPPED;
 
    private boolean deliveringLargeMessage;
 
@@ -148,11 +140,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private boolean keepConnecting = true;
 
-   private ActiveMQServer server;
+   private final ActiveMQServer server;
 
    private final BridgeMetrics metrics = new BridgeMetrics();
 
-   private BridgeConfiguration configuration;
+   private final BridgeConfiguration configuration;
 
    public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final BridgeConfiguration configuration,
@@ -185,11 +177,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       this.server = server;
    }
 
-   /** For tests mainly */
-   public boolean isBlockedOnFlowControl() {
-      return blockedOnFlowControl;
-   }
-
    public static final byte[] getDuplicateBytes(final UUID nodeUUID, final 
long messageID) {
       byte[] bytes = new byte[24];
 
@@ -202,6 +189,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       return bytes;
    }
 
+   // for tests
+   public boolean isBlockedOnFlowControl() {
+      return blockedOnFlowControl;
+   }
+
    // for tests
    public ClientSessionFactory getSessionFactory() {
       return csf;
@@ -230,7 +222,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    @Override
    public void onCreditsFlow(boolean blocked, ClientProducerCredits 
producerCredits) {
       if (logger.isTraceEnabled()) {
-         logger.trace("Bridge {} received credits, with blocked = {}", 
this.getName(), blocked);
+         logger.trace("Bridge {} received credits, with blocked = {}", 
configuration.getName(), blocked);
       }
       this.blockedOnFlowControl = blocked;
       if (!blocked) {
@@ -240,7 +232,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    @Override
    public void onCreditsFail(ClientProducerCredits producerCredits) {
-      
ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(producerCredits.getAddress()),
 String.valueOf(this.getName()));
+      
ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(producerCredits.getAddress()),
 configuration.getName());
       disconnect();
    }
 
@@ -251,22 +243,23 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    @Override
    public synchronized void start() throws Exception {
-      if (started) {
-         return;
+      State localState = this.state;
+      if (localState == State.STARTING || localState == State.STARTED || 
localState == State.STOPPING || localState == State.PAUSING) {
+         logger.debug("Bridge {} state is {}. Ignoring call to start.", 
configuration.getName(), localState);
+         if (localState == State.STOPPING || localState == State.PAUSING) {
+            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "started", localState);
+         } else {
+            return;
+         }
       }
 
-      started = true;
+      state = State.STARTING;
 
-      stopping = false;
+      logger.debug("Bridge {} is starting", configuration.getName());
 
-      activate();
+      executor.execute(new ConnectRunnable());
 
-      if (notificationService != null) {
-         TypedProperties props = new TypedProperties();
-         props.putSimpleStringProperty(SimpleString.of("name"), 
SimpleString.of(configuration.getName()));
-         Notification notification = new Notification(nodeUUID.toString(), 
CoreNotificationType.BRIDGE_STARTED, props);
-         notificationService.sendNotification(notification);
-      }
+      sendNotification(CoreNotificationType.BRIDGE_STARTED);
    }
 
    @Override
@@ -275,10 +268,10 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    private void cancelRefs() {
-      LinkedList<MessageReference> list = new LinkedList<>();
+      LinkedList<MessageReference> list;
 
       synchronized (refs) {
-         list.addAll(refs.values());
+         list = new LinkedList<>(refs.values());
          refs.clear();
       }
 
@@ -363,49 +356,45 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    @Override
-   public void stop() throws Exception {
-      if (stopping) {
-         return;
+   public synchronized void stop() throws Exception {
+      State localState = state;
+      if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING) {
+         logger.debug("Bridge {} state is {}. Ignoring call to stop.", 
configuration.getName(), localState);
+         if (localState == State.PAUSING) {
+            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "stopped", localState);
+         } else {
+            return;
+         }
       }
 
-      stopping = true;
+      state = State.STOPPING;
 
-      logger.debug("Bridge {} being stopped", configuration.getName());
+      logger.debug("Bridge {} is stopping", configuration.getName());
 
-      if (futureScheduledReconnection != null) {
-         futureScheduledReconnection.cancel(true);
+      if (scheduledReconnection != null) {
+         scheduledReconnection.cancel(true);
       }
 
       executor.execute(new StopRunnable());
+   }
 
-      if (notificationService != null) {
-         TypedProperties props = new TypedProperties();
-         props.putSimpleStringProperty(SimpleString.of("name"), 
SimpleString.of(configuration.getName()));
-         Notification notification = new Notification(nodeUUID.toString(), 
CoreNotificationType.BRIDGE_STOPPED, props);
-         try {
-            notificationService.sendNotification(notification);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.broadcastBridgeStoppedError(e);
+   @Override
+   public synchronized void pause() throws Exception {
+      State localState = state;
+      if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING || localState == State.PAUSED) {
+         logger.debug("Bridge {} state is {}. Ignoring call to pause.", 
configuration.getName(), localState);
+         if (localState == State.STOPPING || localState == State.STOPPED) {
+            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "paused", localState);
+         } else {
+            return;
          }
       }
-   }
 
-   @Override
-   public void pause() throws Exception {
-      logger.debug("Bridge {} being paused", configuration.getName());
+      state = State.PAUSING;
 
-      executor.execute(new PauseRunnable());
+      logger.info("Bridge {} is pausing", configuration.getName());
 
-      if (notificationService != null) {
-         TypedProperties props = new TypedProperties();
-         props.putSimpleStringProperty(SimpleString.of("name"), 
SimpleString.of(configuration.getName()));
-         Notification notification = new Notification(nodeUUID.toString(), 
CoreNotificationType.BRIDGE_STOPPED, props);
-         try {
-            notificationService.sendNotification(notification);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.notificationBridgeStoppedError(e);
-         }
-      }
+      executor.execute(new PauseRunnable());
    }
 
    @Override
@@ -416,11 +405,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    @Override
    public boolean isStarted() {
-      return started;
-   }
-
-   public synchronized void activate() {
-      executor.execute(new ConnectRunnable(this));
+      return state == State.STARTING || state == State.STARTED;
    }
 
    @Override
@@ -438,14 +423,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       return filter;
    }
 
-   // SendAcknowledgementHandler implementation ---------------------
-
    @Override
    public SimpleString getForwardingAddress() {
       return SimpleString.of(configuration.getForwardingAddress());
    }
 
-   // For testing only
    @Override
    public RemotingConnection getForwardingConnection() {
       if (session == null) {
@@ -465,9 +447,10 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    @Override
    public void sendAcknowledged(final Message message) {
-      logger.trace("BridgeImpl::sendAcknowledged received confirmation for 
message {}", message);
+      logger.debug("Bridge {} received confirmation for message {}", 
configuration.getName(), message);
 
-      if (active) {
+      State localState = state;
+      if (localState == State.STARTED || localState == State.STOPPING || 
localState == State.PAUSING) {
          try {
 
             final MessageReference ref;
@@ -493,6 +476,8 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
          }
+      } else {
+         logger.debug("Bridge {} state is {}. Ignoring call to 
sendAcknowledged.", configuration.getName(), localState);
       }
    }
 
@@ -562,7 +547,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       queue.deliverAsync();
    }
 
-
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
       if (RefCountMessage.isRefTraceEnabled() && ref.getMessage() instanceof 
RefCountMessage) {
@@ -574,9 +558,9 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
 
       synchronized (this) {
-         if (!active || !session.isWritable(this)) {
+         if (state != State.STARTED || !session.isWritable(this)) {
             if (logger.isDebugEnabled()) {
-               logger.debug("{}::Ignoring reference on bridge as it is set to 
inactive ref {}, active = {}", this, ref, active);
+               logger.debug("{}::Ignoring reference on bridge as it is set to 
inactive ref {}, active = false", this, ref);
             }
             return HandleStatus.BUSY;
          }
@@ -591,7 +575,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
             return HandleStatus.BUSY;
          }
 
-         logger.trace("Bridge {} is handling reference {} ", ref);
+         logger.trace("Bridge {} is handling reference {} ", 
configuration.getName(), ref);
 
          ref.handled();
 
@@ -620,7 +604,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
             final HandleStatus status;
             if (message.isLargeMessage()) {
                deliveringLargeMessage = true;
-               deliverLargeMessage(dest, ref, (LargeServerMessage) message, 
ref.getMessage());
+               deliverLargeMessage(dest, ref, (LargeServerMessage) message);
                status = HandleStatus.HANDLED;
             } else {
                status = deliverStandardMessage(dest, ref, message, 
ref.getMessage());
@@ -676,12 +660,12 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          }
 
          cleanUpSessionFactory(csf);
-      } catch (Throwable dontCare) {
+      } catch (Throwable ignored) {
       }
 
       try {
          session.cleanUp(false);
-      } catch (Throwable dontCare) {
+      } catch (Throwable ignored) {
       }
 
       if (scaleDownTargetNodeID != null && 
!scaleDownTargetNodeID.equals(nodeUUID.toString())) {
@@ -691,8 +675,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          logger.debug("Received scaleDownTargetNodeID: {}; cancelling 
reconnect.", scaleDownTargetNodeID);
          fail(true, true);
       } else {
-         logger.debug("Received invalid scaleDownTargetNodeID: {}", 
scaleDownTargetNodeID);
-
+         logger.debug("Received null scaleDownTargetNodeID");
          fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
       }
 
@@ -726,9 +709,10 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private void deliverLargeMessage(final SimpleString dest,
                                     final MessageReference ref,
-                                    final LargeServerMessage message,
-                                    final Message originalMessage) {
+                                    final LargeServerMessage message) {
       executor.execute(() -> {
+         logger.trace("going to send large message: {} from {}", message, 
queue);
+
          try {
             producer.send(dest, message.toMessage());
 
@@ -749,11 +733,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       });
    }
 
-   /**
-    * @param ref
-    * @param message
-    * @return
-    */
    private HandleStatus deliverStandardMessage(SimpleString dest, final 
MessageReference ref, Message message, Message originalMessage) {
       // if we failover during send then there is a chance that the
       // that this will throw a disconnect, we need to remove the message
@@ -788,8 +767,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    /**
     * for use in tests mainly
-    *
-    * @return
     */
    public TopologyMember getTargetNodeFromTopology() {
       return this.targetNode;
@@ -822,10 +799,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          queue.getName() + "/" + queue.getID() + "]";
    }
 
-   public ClientSessionFactoryImpl getCSF() {
-      return (ClientSessionFactoryImpl) csf;
-   }
-
    public Transformer getTransformer() {
       return transformer;
    }
@@ -835,6 +808,10 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       return configuration;
    }
 
+   public State getState() {
+      return state;
+   }
+
    protected void fail(final boolean permanently, boolean scaleDown) {
       logger.debug("{}\n\t::fail being called, permanently={}", this, 
permanently);
       //we need to make sure we remove the node from the topology so any 
incoming quorum requests are voted correctly
@@ -867,9 +844,9 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
       retryCount = 0;
       reconnectAttemptsInUse = configuration.getReconnectAttempts();
-      if (futureScheduledReconnection != null) {
-         futureScheduledReconnection.cancel(true);
-         futureScheduledReconnection = null;
+      if (scheduledReconnection != null) {
+         scheduledReconnection.cancel(true);
+         scheduledReconnection = null;
       }
    }
 
@@ -919,121 +896,14 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       csf = sfi;
    }
 
-   /* This is called only when the bridge is activated */
-   protected void connect() {
-      if (stopping)
-         return;
-
-      synchronized (connectionGuard) {
-         if (!keepConnecting) {
-            return;
-         }
-
-         if (logger.isDebugEnabled()) {
-            logger.debug("Connecting  {} to its destination [{}], csf={}", 
this, nodeUUID, csf);
-         }
-         retryCount++;
-
-         try {
-            if (csf == null || csf.isClosed()) {
-               if (stopping)
-                  return;
-               csf = createSessionFactory();
-               if (csf == null) {
-                  // Retrying. This probably means the node is not available 
(for the cluster connection case)
-                  scheduleRetryConnect();
-                  return;
-               }
-               // Session is pre-acknowledge
-               session = (ClientSessionInternal) 
csf.createSession(configuration.getUser(), configuration.getPassword(), false, 
true, true, true, 1);
-               session.getProducerCreditManager().setCallback(this);
-               sessionConsumer = (ClientSessionInternal) 
csf.createSession(configuration.getUser(), configuration.getPassword(), false, 
true, true, true, 1);
-            }
-
-            if (configuration.getForwardingAddress() != null) {
-               ClientSession.AddressQuery query = null;
-
-               try {
-                  query = 
session.addressQuery(SimpleString.of(configuration.getForwardingAddress()));
-               } catch (Throwable e) {
-                  
ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getName(), e);
-                  // This was an issue during startup, we will not count this 
retry
-                  retryCount--;
-
-                  scheduleRetryConnectFixedTimeout(100);
-                  return;
-               }
-
-               if (!query.isExists()) {
-                  
ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getForwardingAddress(),
 retryCount);
-                  scheduleRetryConnect();
-                  return;
-               }
-            }
-
-            // need to reset blockedOnFlowControl after creating a new producer
-            // otherwise in case the bridge was blocked before a previous 
failure
-            // this would never resume
-            blockedOnFlowControl = false;
-            producer = session.createProducer();
-            session.addFailureListener(BridgeImpl.this);
-
-            session.setSendAcknowledgementHandler(BridgeImpl.this);
-
-            afterConnect();
-
-            active = true;
-
-            queue.addConsumer(BridgeImpl.this);
-            queue.deliverAsync();
-
-            ActiveMQServerLogger.LOGGER.bridgeConnected(this);
-
-            serverLocator.addClusterTopologyListener(new TopologyListener());
-
-            keepConnecting = false;
-            return;
-         } catch (ActiveMQException e) {
-            // the session was created while its server was starting, retry it:
-            if (e.getType() == 
ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
-               
ActiveMQServerLogger.LOGGER.errorStartingBridge(configuration.getName());
-
-               // We are not going to count this one as a retry
-               retryCount--;
-
-               
scheduleRetryConnectFixedTimeout(this.configuration.getRetryInterval());
-               return;
-            } else {
-               ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(this);
-               logger.debug("Underlying bridge connection failure", e);
-
-               scheduleRetryConnect();
-            }
-         } catch (ActiveMQInterruptedException | InterruptedException e) {
-            ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e);
-            if (csf != null) {
-               try {
-                  csf.close();
-                  csf = null;
-               } catch (Throwable ignored) {
-               }
-            }
-            fail(false, false);
-            scheduleRetryConnect();
-         }
-      }
-   }
-
    protected void scheduleRetryConnect() {
       if (serverLocator.isClosed()) {
          ActiveMQServerLogger.LOGGER.bridgeLocatorShutdown();
          return;
       }
 
-      if (stopping) {
-         ActiveMQServerLogger.LOGGER.bridgeStopping();
+      if (state == State.STOPPING || state == State.PAUSING) {
+         ActiveMQServerLogger.LOGGER.bridgeWillNotRetry(state == 
State.STOPPING ? "stopping" : "pausing");
          return;
       }
 
@@ -1076,25 +946,23 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
             }
          }
       }
-
    }
 
-
-
    protected void scheduleRetryConnectFixedTimeout(final long milliseconds) {
       try {
          cleanUpSessionFactory(csf);
       } catch (Throwable ignored) {
       }
 
-      if (stopping)
+      if (state == State.STOPPING || state == State.STOPPED || state == 
State.PAUSING || state == State.PAUSED) {
          return;
+      }
 
       if (logger.isDebugEnabled()) {
          logger.debug("Scheduling retry for bridge {} in {} milliseconds", 
configuration.getName(), milliseconds);
       }
 
-      futureScheduledReconnection = scheduledExecutor.schedule(new 
FutureConnectRunnable(executor, this), milliseconds, TimeUnit.MILLISECONDS);
+      scheduledReconnection = scheduledExecutor.schedule(new 
ScheduledConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
    }
 
    private void internalCancelReferences() {
@@ -1105,43 +973,137 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
    }
 
-   /**
-    * just set deliveringLargeMessage to false
-    */
    private synchronized void unsetLargeMessageDelivery() {
       deliveringLargeMessage = false;
    }
 
-   // The scheduling will still use the main executor here
-   private static class FutureConnectRunnable implements Runnable {
-
-      private final BridgeImpl bridge;
-
-      private final Executor executor;
-
-      private FutureConnectRunnable(Executor exe, BridgeImpl bridge) {
-         executor = exe;
-         this.bridge = bridge;
-      }
+   private class ScheduledConnectRunnable implements Runnable {
 
       @Override
       public void run() {
-         if (bridge.isStarted())
-            executor.execute(new ConnectRunnable(bridge));
+         if (isStarted()) {
+            // The scheduling will still use the main executor here
+            executor.execute(new ConnectRunnable());
+         }
       }
    }
 
-   private static final class ConnectRunnable implements Runnable {
-
-      private final BridgeImpl bridge;
-
-      private ConnectRunnable(BridgeImpl bridge2) {
-         bridge = bridge2;
-      }
+   private class ConnectRunnable implements Runnable {
 
       @Override
       public void run() {
-         bridge.connect();
+         if (state == State.STOPPING || state == State.PAUSING) {
+            logger.debug("Bridge {} state is {}. Ignoring call to connect.", 
configuration.getName(), state);
+            return;
+         }
+
+         synchronized (connectionGuard) {
+            if (!keepConnecting) {
+               return;
+            }
+
+            if (logger.isDebugEnabled()) {
+               logger.debug("Connecting  {} to its destination [{}], csf={}", 
this, nodeUUID, csf);
+            }
+            retryCount++;
+
+            try {
+               if (csf == null || csf.isClosed()) {
+                  if (state == State.STOPPING || state == State.PAUSING)
+                     return;
+                  csf = createSessionFactory();
+                  if (csf == null) {
+                     // Retrying. This probably means the node is not 
available (for the cluster connection case)
+                     scheduleRetryConnect();
+                     return;
+                  }
+                  // Session is pre-acknowledge
+                  session = (ClientSessionInternal) 
csf.createSession(configuration.getUser(), configuration.getPassword(), false, 
true, true, true, 1);
+                  
session.getProducerCreditManager().setCallback(BridgeImpl.this);
+                  sessionConsumer = (ClientSessionInternal) 
csf.createSession(configuration.getUser(), configuration.getPassword(), false, 
true, true, true, 1);
+               }
+
+               if (configuration.getForwardingAddress() != null) {
+                  ClientSession.AddressQuery query = null;
+
+                  try {
+                     query = 
session.addressQuery(SimpleString.of(configuration.getForwardingAddress()));
+                  } catch (Throwable e) {
+                     
ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getName(), e);
+                     // This was an issue during startup, we will not count 
this retry
+                     retryCount--;
+
+                     scheduleRetryConnectFixedTimeout(100);
+                     return;
+                  }
+
+                  if (!query.isExists()) {
+                     
ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getForwardingAddress(),
 retryCount);
+                     scheduleRetryConnect();
+                     return;
+                  }
+               }
+
+               // need to reset blockedOnFlowControl after creating a new 
producer
+               // otherwise in case the bridge was blocked before a previous 
failure
+               // this would never resume
+               blockedOnFlowControl = false;
+               producer = session.createProducer();
+               session.addFailureListener(BridgeImpl.this);
+
+               session.setSendAcknowledgementHandler(BridgeImpl.this);
+
+               afterConnect();
+
+               state = State.STARTED;
+
+               queue.addConsumer(BridgeImpl.this);
+               queue.deliverAsync();
+
+               ActiveMQServerLogger.LOGGER.bridgeConnected(BridgeImpl.this);
+
+               serverLocator.addClusterTopologyListener(new 
ClusterTopologyListener() {
+                  @Override
+                  public void nodeUP(TopologyMember member, boolean last) {
+                     BridgeImpl.this.nodeUP(member, last);
+                  }
+
+                  @Override
+                  public void nodeDown(long eventUID, String nodeID) {
+                  }
+               });
+
+               keepConnecting = false;
+            } catch (ActiveMQException e) {
+               // the session was created while its server was starting, retry 
it:
+               if (e.getType() == 
ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
+                  
ActiveMQServerLogger.LOGGER.errorStartingBridge(configuration.getName());
+
+                  // We are not going to count this one as a retry
+                  retryCount--;
+
+                  
scheduleRetryConnectFixedTimeout(configuration.getRetryInterval());
+               } else {
+                  
ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(BridgeImpl.this);
+                  logger.debug("Underlying bridge connection failure", e);
+
+                  scheduleRetryConnect();
+               }
+            } catch (ActiveMQInterruptedException | InterruptedException e) {
+               
ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e);
+            } catch (Exception e) {
+               
ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e);
+               if (csf != null) {
+                  try {
+                     csf.close();
+                     csf = null;
+                  } catch (Throwable ignored) {
+                  }
+               }
+               fail(false, false);
+               scheduleRetryConnect();
+            }
+         }
       }
    }
 
@@ -1149,47 +1111,54 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
       @Override
       public void run() {
-         logger.debug("stopping bridge {}", BridgeImpl.this);
-         queue.removeConsumer(BridgeImpl.this);
+         try {
+            logger.debug("stopping bridge {}", BridgeImpl.this);
+            logger.trace("Removing consumer on stopRunnable {} from queue {}", 
this, queue);
+            queue.removeConsumer(BridgeImpl.this);
 
-         synchronized (BridgeImpl.this) {
-            logger.debug("Closing Session for bridge {}", 
configuration.getName());
-            started = false;
-            active = false;
-         }
+            if (!pendingAcks.await(configuration.getPendingAckTimeout(), 
TimeUnit.MILLISECONDS)) {
+               
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Stopping", 
configuration.getName(), pendingAcks.getCount());
+            }
 
-         if (session != null) {
-            logger.debug("Cleaning up session {}", session);
-            session.removeFailureListener(BridgeImpl.this);
-            try {
-               session.close();
-               session = null;
-            } catch (ActiveMQException dontcare) {
+            synchronized (BridgeImpl.this) {
+               state = State.STOPPED;
             }
-         }
 
-         if (sessionConsumer != null) {
-            logger.debug("Cleaning up session {}", session);
-            try {
-               sessionConsumer.close();
-               sessionConsumer = null;
-            } catch (ActiveMQException dontcare) {
+            if (session != null) {
+               logger.debug("Cleaning up session {} for bridge {}", session, 
configuration.getName());
+               session.removeFailureListener(BridgeImpl.this);
+               try {
+                  session.close();
+                  session = null;
+               } catch (ActiveMQException ignored) {
+               }
+            }
+
+            if (sessionConsumer != null) {
+               logger.debug("Cleaning up session {}", session);
+               try {
+                  sessionConsumer.close();
+                  sessionConsumer = null;
+               } catch (ActiveMQException ignored) {
+               }
             }
-         }
 
-         internalCancelReferences();
+            internalCancelReferences();
 
-         if (csf != null) {
-            csf.cleanup();
-         }
+            if (csf != null) {
+               csf.cleanup();
+            }
 
-         synchronized (connectionGuard) {
-            keepConnecting = true;
-         }
+            synchronized (connectionGuard) {
+               keepConnecting = true;
+            }
 
-         logger.trace("Removing consumer on stopRunnable {} from queue {}", 
this, queue);
+            sendNotification(CoreNotificationType.BRIDGE_STOPPED);
 
-         ActiveMQServerLogger.LOGGER.bridgeStopped(configuration.getName());
+            ActiveMQServerLogger.LOGGER.bridgeStopped(configuration.getName());
+         } catch (Exception e) {
+            
ActiveMQServerLogger.LOGGER.errorStoppingBridge(configuration.getName(), e);
+         }
       }
    }
 
@@ -1198,38 +1167,39 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       @Override
       public void run() {
          try {
+            logger.debug("pausing bridge {}", BridgeImpl.this);
+            logger.trace("Removing consumer on pauseRunnable {} from queue 
{}", this, queue);
             queue.removeConsumer(BridgeImpl.this);
 
-            if (!pendingAcks.await(60, TimeUnit.SECONDS)) {
-               
ActiveMQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(),
 pendingAcks.getCount());
+            if (!pendingAcks.await(configuration.getPendingAckTimeout(), 
TimeUnit.MILLISECONDS)) {
+               
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Pausing", 
configuration.getName(), pendingAcks.getCount());
             }
 
             synchronized (BridgeImpl.this) {
-               started = false;
-               active = false;
+               state = State.PAUSED;
             }
 
             internalCancelReferences();
 
+            sendNotification(CoreNotificationType.BRIDGE_STOPPED);
+
             ActiveMQServerLogger.LOGGER.bridgePaused(configuration.getName());
          } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorPausingBridge(e);
+            
ActiveMQServerLogger.LOGGER.errorPausingBridge(configuration.getName(), e);
          }
       }
-
    }
 
-   private class TopologyListener implements ClusterTopologyListener {
-
-      // ClusterListener
-      @Override
-      public void nodeUP(TopologyMember member, boolean last) {
-         BridgeImpl.this.nodeUP(member, last);
-      }
-
-      @Override
-      public void nodeDown(long eventUID, String nodeID) {
-
+   private void sendNotification(CoreNotificationType type) {
+      if (notificationService != null) {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(SimpleString.of("name"), getName());
+         Notification notification = new Notification(nodeUUID.toString(), 
type, props);
+         try {
+            notificationService.sendNotification(notification);
+         } catch (Exception e) {
+            
ActiveMQServerLogger.LOGGER.notificationBridgeError(configuration.getName(), 
type, e);
+         }
       }
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index d5529a6baa..ff5fceb39f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -460,7 +460,7 @@ public class ManagementServiceImpl implements 
ManagementService {
    @Override
    public synchronized void registerBridge(final Bridge bridge) throws 
Exception {
       bridge.setNotificationService(this);
-      ObjectName objectName = 
objectNameBuilder.getBridgeObjectName(bridge.getName().toString());
+      ObjectName objectName = 
objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName());
       BridgeControl control = new BridgeControlImpl(bridge, storageManager);
       registerInJMX(objectName, control);
       registerInRegistry(ResourceNames.BRIDGE + bridge.getName(), control);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 88ecad9be9..01eee16306 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1562,6 +1562,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="pending-ack-timeout" type="xsd:long" 
default="60000" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  how long to wait for acknowledgements to arrive from the 
bridge's target while stopping or pausing the bridge
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element ref="discovery-type" maxOccurs="1" minOccurs="1"/>
 
       </xsd:all>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
index ce8351d0ed..6bde568d5b 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
@@ -73,6 +73,7 @@ public class BridgeConfigurationTest {
       assertEquals(12, bridgeConfiguration.getCallTimeout());
       assertEquals(ComponentConfigurationRoutingType.MULTICAST, 
bridgeConfiguration.getRoutingType());
       assertEquals(1, bridgeConfiguration.getConcurrency());
+      assertEquals(321, bridgeConfiguration.getPendingAckTimeout());
    }
 
    @Test
@@ -112,6 +113,7 @@ public class BridgeConfigurationTest {
       assertEquals("102400", 
jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString());
       assertEquals("30000", 
jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString());
       assertEquals("1", 
jsonObject.get(BridgeConfiguration.CONCURRENCY).toString());
+      assertEquals("60000", 
jsonObject.get(BridgeConfiguration.PENDING_ACK_TIMEOUT).toString());
 
       // also should contain default non-null values of string fields
       assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", 
jsonObject.get(BridgeConfiguration.USER).toString());
@@ -199,6 +201,7 @@ public class BridgeConfigurationTest {
       objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
       objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
       objectBuilder.add(BridgeConfiguration.CONFIGURATION_MANAGED, true);
+      objectBuilder.add(BridgeConfiguration.PENDING_ACK_TIMEOUT, 321);
 
       return objectBuilder.build();
    }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 31060d31ac..bc8fad3ce4 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -428,7 +428,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
             assertEquals("org.foo.BridgeTransformer3", 
bc.getTransformerConfiguration().getClassName());
             assertEquals("bridgeTransformerValue1", 
bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey1"));
             assertEquals("bridgeTransformerValue2", 
bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey2"));
-
+            assertEquals(123456, bc.getPendingAckTimeout());
          }
       }
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 456a7bf272..c57fcf9af8 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -255,6 +255,7 @@
                <property key="bridgeTransformerKey2" 
value="bridgeTransformerValue2"/>
             </transformer>
             <producer-window-size>555k</producer-window-size>
+            <pending-ack-timeout>123456</pending-ack-timeout>
             <discovery-group-ref discovery-group-name="dg1"/>
             <forwarding-address>bridge-forwarding-address2</forwarding-address>
          </bridge>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 436d2fb819..9022a6af9f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -250,6 +250,7 @@
                <property key="bridgeTransformerKey2" 
value="bridgeTransformerValue2"/>
             </transformer>
             <producer-window-size>555k</producer-window-size>
+            <pending-ack-timeout>123456</pending-ack-timeout>
             <discovery-group-ref discovery-group-name="dg1"/>
          </bridge>
          <bridge name="bridge4">
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml
index 289716712a..c4138a868c 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml
@@ -51,6 +51,7 @@
          <property key="bridgeTransformerKey2" 
value="bridgeTransformerValue2"/>
       </transformer>
       <producer-window-size>555k</producer-window-size>
+      <pending-ack-timeout>123456</pending-ack-timeout>
       <discovery-group-ref discovery-group-name="dg1"/>
    </bridge>
    <bridge name="bridge4">
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index f73047d732..23dd1c63a8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,6 +71,7 @@ import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -2084,6 +2086,217 @@ public class BridgeTest extends ActiveMQTestBase {
       assertEquals(0, 
server.getManagementService().getResources(BridgeControl.class).length);
    }
 
+   @TestTemplate
+   public void testPendingAcksNeverArriveOnStop() throws Exception {
+      testPendingAcksNeverArrive(true, false);
+   }
+
+   @TestTemplate
+   public void testPendingAcksNeverArriveOnPause() throws Exception {
+      testPendingAcksNeverArrive(false, false);
+   }
+
+   @TestTemplate
+   public void testPendingAcksNeverArriveOnStopWithLargeMessages() throws 
Exception {
+      testPendingAcksNeverArrive(true, true);
+   }
+
+   @TestTemplate
+   public void testPendingAcksNeverArriveOnPauseWithLargeMessages() throws 
Exception {
+      testPendingAcksNeverArrive(false, true);
+   }
+
+   private void testPendingAcksNeverArrive(boolean stop, boolean large) throws 
Exception {
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, null);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+      final long pendingAckTimeout = 2000;
+      final int messageSize = 1024;
+      final int numMessages = 10;
+
+      TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), null);
+      TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+
+      server0.getConfiguration()
+             .setConnectorConfigurations(Map.of(server1tc.getName(), 
server1tc))
+             .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration()
+                                                       .setName("bridge1")
+                                                       
.setQueueName(queueName0)
+                                                       
.setForwardingAddress(forwardAddress)
+                                                       .setRetryInterval(1000)
+                                                       
.setReconnectAttemptsOnSameNode(-1)
+                                                       
.setUseDuplicateDetection(false)
+                                                       
.setConfirmationWindowSize(numMessages * messageSize / 2)
+                                                       
.setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2))
+                                                       
.setPendingAckTimeout(pendingAckTimeout)
+                                                       
.setStaticConnectors(Arrays.asList(server1tc.getName()))));
+      
server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress)));
+      server0.start();
+
+      // this interceptor will prevent the target from returning any send 
acknowledgements
+      Interceptor sendBlockingInterceptor = (packet, connection) -> {
+         if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == 
PacketImpl.SESS_SEND_LARGE) {
+            return false;
+         }
+         return true;
+      };
+
+      
server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
+      server1.start();
+      
server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor);
+      Bridge bridge = server0.getClusterManager().getBridges().get("bridge1");
+      Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100);
+
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+      ClientSessionFactory sf0 = 
addSessionFactory(locator.createSessionFactory(server0tc));
+      ClientSession session0 = sf0.createSession(false, true, true);
+      ClientProducer producer0 = 
session0.createProducer(SimpleString.of(testAddress));
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = SimpleString.of("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+         message.putIntProperty(propKey, i);
+         message.getBodyBuffer().writeBytes(bytes);
+         producer0.send(message);
+      }
+
+      session0.close();
+      sf0.close();
+
+      Wait.assertEquals((long) numMessages, () -> 
bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100);
+      long start = System.currentTimeMillis();
+      BridgeImpl.State desiredState;
+      if (stop) {
+         bridge.stop();
+         desiredState = BridgeImpl.State.STOPPED;
+      } else {
+         bridge.pause();
+         desiredState = BridgeImpl.State.PAUSED;
+      }
+      Wait.assertEquals(desiredState, () -> ((BridgeImpl)bridge).getState(), 
pendingAckTimeout, 25);
+      assertTrue(System.currentTimeMillis() - start >= pendingAckTimeout);
+      Wait.assertEquals((long) numMessages, () -> 
server0.locateQueue(queueName0).getMessageCount(), 2000, 100);
+      Wait.assertEquals(0L, () -> 
server0.locateQueue(queueName0).getDeliveringCount(), 2000, 100);
+   }
+
+   @TestTemplate
+   public void testPendingAcksEventuallyArriveOnStop() throws Exception {
+      testPendingAcksEventuallyArrive(true, false);
+   }
+
+   @TestTemplate
+   public void testPendingAcksEventuallyArriveOnPause() throws Exception {
+      testPendingAcksEventuallyArrive(false, false);
+   }
+
+   @TestTemplate
+   public void testPendingAcksEventuallyArriveOnStopWithLargeMessages() throws 
Exception {
+      testPendingAcksEventuallyArrive(true, true);
+   }
+
+   @TestTemplate
+   public void testPendingAcksEventuallyArriveOnPauseWithLargeMessages() 
throws Exception {
+      testPendingAcksEventuallyArrive(false, true);
+   }
+
+   private void testPendingAcksEventuallyArrive(boolean stop, boolean large) 
throws Exception {
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, null);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+      final long pendingAckTimeout = 2000;
+      final int messageSize = 1024;
+      final int numMessages = 10;
+
+      TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), null);
+      TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+
+      server0.getConfiguration()
+             .setConnectorConfigurations(Map.of(server1tc.getName(), 
server1tc))
+             .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration()
+                                                       .setName("bridge1")
+                                                       
.setQueueName(queueName0)
+                                                       
.setForwardingAddress(forwardAddress)
+                                                       .setRetryInterval(1000)
+                                                       
.setReconnectAttemptsOnSameNode(-1)
+                                                       
.setUseDuplicateDetection(false)
+                                                       
.setConfirmationWindowSize(numMessages * messageSize / 2)
+                                                       
.setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2))
+                                                       
.setPendingAckTimeout(pendingAckTimeout)
+                                                       
.setStaticConnectors(Arrays.asList(server1tc.getName()))));
+      
server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress)));
+      server0.start();
+
+      // this interceptor will prevent the target from returning any send acks 
until a certain amount of time has elapsed
+      final CountDownLatch opLatch = new CountDownLatch(1);
+      Interceptor sendBlockingInterceptor = (packet, connection) -> {
+         if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == 
PacketImpl.SESS_SEND_LARGE) {
+            try {
+               opLatch.await(pendingAckTimeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+               throw new RuntimeException(e);
+            }
+         }
+         return true;
+      };
+
+      
server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
+      server1.start();
+      
server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor);
+      Bridge bridge = server0.getClusterManager().getBridges().get("bridge1");
+      Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100);
+
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+      ClientSessionFactory sf0 = 
addSessionFactory(locator.createSessionFactory(server0tc));
+      ClientSession session0 = sf0.createSession(false, true, true);
+      ClientProducer producer0 = 
session0.createProducer(SimpleString.of(testAddress));
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = SimpleString.of("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+         message.putIntProperty(propKey, i);
+         message.getBodyBuffer().writeBytes(bytes);
+         producer0.send(message);
+      }
+
+      session0.close();
+      sf0.close();
+
+      Wait.assertEquals((long) numMessages, () -> 
bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100);
+      assertEquals((long) numMessages, 
server0.locateQueue(queueName0).getDeliveringCount());
+      BridgeImpl.State desiredState;
+      if (stop) {
+         bridge.stop();
+         desiredState = BridgeImpl.State.STOPPED;
+      } else {
+         bridge.pause();
+         desiredState = BridgeImpl.State.PAUSED;
+      }
+      Thread.sleep(pendingAckTimeout / 2);
+      opLatch.countDown();
+      Wait.assertEquals(desiredState, () -> ((BridgeImpl)bridge).getState(), 
pendingAckTimeout, 25);
+      Wait.assertEquals(0L, () -> 
server0.locateQueue(queueName0).getMessageCount(), 2000, 100);
+      Wait.assertEquals(0L, () -> 
server0.locateQueue(queueName0).getDeliveringCount(), 2000, 100);
+      Wait.assertEquals((long) numMessages, () -> 
server1.locateQueue(queueName1).getMessageCount(), 2000, 100);
+   }
+
    /**
     * It will inspect the journal directly and determine if there are queues 
on this journal,
     *
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
index c4de23d8d0..48d5844bfd 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
@@ -56,6 +56,7 @@ public class BridgeConfigurationStorageTest extends 
StorageManagerTestBase {
       configuration.setParentName("name");
       configuration.setQueueName("QueueName");
       configuration.setConcurrency(2);
+      configuration.setPendingAckTimeout(9876);
       configuration.setForwardingAddress("forward");
       configuration.setProducerWindowSize(123123);
       configuration.setConfirmationWindowSize(123123);
@@ -79,6 +80,7 @@ public class BridgeConfigurationStorageTest extends 
StorageManagerTestBase {
       assertEquals(configuration.getName(), 
persistedBridgeConfiguration.getBridgeConfiguration().getName());
       assertEquals(configuration.getQueueName(), 
persistedBridgeConfiguration.getBridgeConfiguration().getQueueName());
       assertEquals(configuration.getConcurrency(), 
persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency());
+      assertEquals(configuration.getPendingAckTimeout(), 
persistedBridgeConfiguration.getBridgeConfiguration().getPendingAckTimeout());
       assertEquals(configuration.getForwardingAddress(), 
persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress());
       assertEquals(configuration.getStaticConnectors(), 
persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors());
       
assertNotNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to