This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4cfaadc113 ARTEMIS-5852 Lock coordination for acceptors
4cfaadc113 is described below

commit 4cfaadc11375cfc02eccd0e8f5b975ac00c81a5c
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jan 29 12:15:21 2026 -0500

    ARTEMIS-5852 Lock coordination for acceptors
    
    I'm adding a LockCoordinator to the broker, that will use DistributedLock
    to help start and stop acceptors.
    
    You can associate the LockCoordinator with acceptors and an acceptor
    serving only clients would then be activated in only one of the brokers.
---
 .../artemis/api/core/TransportConfiguration.java   |  11 +
 .../lockmanager/file/FileBasedLockManager.java     |  11 +-
 .../zookeeper/CuratorDistributedLockManager.java   |  14 +
 .../artemis/core/config/Configuration.java         |   5 +
 .../core/config/LockCoordinatorConfiguration.java  |  92 ++++++
 .../core/config/impl/ConfigurationImpl.java        |  13 +
 .../deployers/impl/FileConfigurationParser.java    |  55 +++-
 .../core/remoting/impl/netty/NettyAcceptor.java    |  31 ++
 .../remoting/server/impl/RemotingServiceImpl.java  |   9 +
 .../artemis/core/server/ActiveMQServer.java        |   3 +
 .../artemis/core/server/ActiveMQServerLogger.java  |  12 +
 .../core/server/impl/ActiveMQServerImpl.java       |  42 +++
 .../artemis/core/server/lock/LockCoordinator.java  | 295 +++++++++++++++++++
 .../artemis/spi/core/remoting/Acceptor.java        |   9 +
 .../resources/schema/artemis-configuration.xsd     |  76 +++++
 .../core/config/impl/ConfigurationImplTest.java    |  76 +++++
 .../config/impl/FileConfigurationParserTest.java   |  44 +++
 docs/user-manual/_book.adoc                        |   1 +
 .../_diagrams/lock-coordination-example.odg        | Bin 0 -> 25555 bytes
 .../images/lock-coordination-example.png           | Bin 0 -> 28759 bytes
 docs/user-manual/lock-coordination.adoc            | 164 +++++++++++
 docs/user-manual/restart-sequence.adoc             |  12 +-
 .../dualMirrorSingleAcceptor/ZK/A/broker.xml       | 205 +++++++++++++
 .../dualMirrorSingleAcceptor/ZK/B/broker.xml       | 205 +++++++++++++
 .../dualMirrorSingleAcceptor/file/A/broker.xml     | 186 ++++++++++++
 .../dualMirrorSingleAcceptor/file/B/broker.xml     | 187 ++++++++++++
 .../DualMirrorSingleAcceptorRunningTest.java       | 275 +++++++++++++++++
 .../smoke/lockmanager/LockCoordinatorTest.java     | 325 +++++++++++++++++++++
 .../tests/smoke/lockmanager/ZookeeperCluster.java  |  77 +++++
 .../ZookeeperLockManagerSinglePairTest.java        |  41 +--
 30 files changed, 2437 insertions(+), 39 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index eb6f25025c..4a85234417 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable {
 
    private String name;
 
+   private String lockCoordinator;
+
    private String factoryClassName = "null";
 
    private Map<String, Object> params;
@@ -413,6 +415,15 @@ public class TransportConfiguration implements 
Serializable {
       }
    }
 
+   public String getLockCoordinator() {
+      return lockCoordinator;
+   }
+
+   public TransportConfiguration setLockCoordinator(String lockCoordinator) {
+      this.lockCoordinator = lockCoordinator;
+      return this;
+   }
+
    private static String replaceWildcardChars(final String str) {
       return str.replace('.', '-');
    }
diff --git 
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
 
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
index dcd5b19d88..3329b88bf7 100644
--- 
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
+++ 
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
@@ -32,8 +32,15 @@ import org.apache.activemq.artemis.lockmanager.MutableLong;
 import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
 
 /**
- * This is an implementation suitable to be used just on unit tests and it 
won't attempt to manage nor purge existing
- * stale locks files. It's part of the tests life-cycle to properly set-up and 
tear-down the environment.
+ * file-based distributed lock manager.
+ * <p>
+ * This implementation uses the file system to manage distributed locks
+ * <p>
+ * Valid configuration parameters:
+ * <ul>
+ *   <li><b>locks-folder</b> (required): Path to the directory where lock 
files will be created and managed.
+ *   The directory must be created in advance before using this lock 
manager.</li>
+ * </ul>
  */
 public class FileBasedLockManager implements DistributedLockManager {
 
diff --git 
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
 
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
index 6b5fd5f210..e6aa6689ca 100644
--- 
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
+++ 
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
@@ -44,6 +44,20 @@ import org.apache.curator.utils.DebugUtils;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.joining;
 
+/**
+ * ZooKeeper-based distributed lock manager using Apache Curator.
+ * <p>
+ * Valid configuration parameters:
+ * <ul>
+ *   <li><b>connect-string</b> (required): ZooKeeper connection string (e.g., 
"localhost:2181" or "host1:2181,host2:2181,host3:2181")</li>
+ *   <li><b>namespace</b> (required): Namespace prefix for all ZooKeeper paths 
to isolate lock manager data</li>
+ *   <li><b>session-ms</b> (optional, default: 18000): Session timeout in 
milliseconds</li>
+ *   <li><b>session-percent</b> (optional, default: 33): Percentage of session 
timeout to use for lock operations</li>
+ *   <li><b>connection-ms</b> (optional, default: 8000): Connection timeout in 
milliseconds</li>
+ *   <li><b>retries</b> (optional, default: 1): Number of retry attempts for 
failed operations</li>
+ *   <li><b>retries-ms</b> (optional, default: 1000): Delay in milliseconds 
between retry attempts</li>
+ * </ul>
+ */
 public class CuratorDistributedLockManager implements DistributedLockManager, 
ConnectionStateListener {
 
    enum PrimitiveType {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index dfbc499c08..440abab376 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config;
 
 import java.io.File;
 import java.net.URL;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -1368,6 +1369,10 @@ public interface Configuration {
 
    void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin);
 
+   Collection<LockCoordinatorConfiguration> getLockCoordinatorConfigurations();
+
+   void addLockCoordinatorConfiguration(LockCoordinatorConfiguration 
configuration);
+
    List<ActiveMQServerBasePlugin> getBrokerPlugins();
 
    List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
new file mode 100644
index 0000000000..1c742e2c6e
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LockCoordinatorConfiguration {
+
+   String name;
+   String lockId;
+   String className;
+   int checkPeriod;
+   Map<String, String> properties;
+
+   public LockCoordinatorConfiguration() {
+      properties = new HashMap<>();
+   }
+
+   public LockCoordinatorConfiguration(Map<String, String> properties) {
+      this.properties = properties;
+   }
+
+   public String getName() {
+      return name;
+   }
+
+   public LockCoordinatorConfiguration setName(String name) {
+      if (name == null || name.trim().isEmpty()) {
+         throw new IllegalArgumentException("LockCoordinator name cannot be 
null or empty");
+      }
+      this.name = name;
+      return this;
+   }
+
+   public String getLockId() {
+      return lockId;
+   }
+
+   public LockCoordinatorConfiguration setLockId(String lockId) {
+      if (lockId == null || lockId.trim().isEmpty()) {
+         throw new IllegalArgumentException("LockCoordinator lockId cannot be 
null or empty");
+      }
+      this.lockId = lockId;
+      return this;
+   }
+
+   public String getClassName() {
+      return className;
+   }
+
+   public LockCoordinatorConfiguration setClassName(String className) {
+      this.className = className;
+      return this;
+   }
+
+   public int getCheckPeriod() {
+      return checkPeriod;
+   }
+
+   public LockCoordinatorConfiguration setCheckPeriod(int checkPeriod) {
+      if (checkPeriod <= 0) {
+         throw new IllegalArgumentException("LockCoordinator checkPeriod must 
be positive, got: " + checkPeriod);
+      }
+      this.checkPeriod = checkPeriod;
+      return this;
+   }
+
+   public Map<String, String> getProperties() {
+      return properties;
+   }
+
+   @Override
+   public String toString() {
+      return "LockCoordinatorConfiguration{" + "name='" + name + '\'' + ", 
lockId='" + lockId + '\'' + ", className='" + className + '\'' + ", 
checkPeriod=" + checkPeriod + ", properties=" + properties + '}';
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index b74f6596df..4bb27eeef8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -88,6 +88,7 @@ import 
org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.JaasAppConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
 import org.apache.activemq.artemis.core.config.MetricsConfiguration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@@ -227,6 +228,8 @@ public class ConfigurationImpl extends 
javax.security.auth.login.Configuration i
 
    private boolean persistIDCache = 
ActiveMQDefaultConfiguration.isDefaultPersistIdCache();
 
+   private Set<LockCoordinatorConfiguration> lockCoordinatorConfigurations = 
new HashSet<>();
+
    private List<String> incomingInterceptorClassNames = new ArrayList<>();
 
    private List<String> outgoingInterceptorClassNames = new ArrayList<>();
@@ -3509,6 +3512,16 @@ public class ConfigurationImpl extends 
javax.security.auth.login.Configuration i
       return this;
    }
 
+   @Override
+   public Set<LockCoordinatorConfiguration> getLockCoordinatorConfigurations() 
{
+      return lockCoordinatorConfigurations;
+   }
+
+   @Override
+   public void addLockCoordinatorConfiguration(LockCoordinatorConfiguration 
configuration) {
+      lockCoordinatorConfigurations.add(configuration);
+   }
+
    // extend property utils with ability to auto-fill and locate from 
collections
    // collection entries are identified by the name() property
    private static class CollectionAutoFillPropertiesUtil extends 
PropertyUtilsBean {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index f6ad71f36d..81d48d9ffb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -51,6 +51,7 @@ import 
org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
 import org.apache.activemq.artemis.core.config.MetricsConfiguration;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.TransformerConfiguration;
@@ -97,6 +98,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import 
org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.routing.KeyType;
@@ -737,6 +739,22 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          }
       }
 
+      NodeList lockCoordinators = e.getElementsByTagName("lock-coordinators");
+
+      if (lockCoordinators != null) {
+         for (int i = 0; i < lockCoordinators.getLength(); i++) {
+            Element lockCoordinatorElement = (Element) 
lockCoordinators.item(i);
+
+            for (int j = 0; j < 
lockCoordinatorElement.getChildNodes().getLength(); ++j) {
+               Node node = lockCoordinatorElement.getChildNodes().item(j);
+
+               if (node.getNodeName().equalsIgnoreCase("lock-coordinator")) {
+                  parseLockCoordinator((Element) node, config);
+               }
+            }
+         }
+      }
+
       // Persistence config
 
       config.setLargeMessagesDirectory(getString(e, 
"large-messages-directory", config.getLargeMessagesDirectory(), 
NOT_NULL_OR_EMPTY));
@@ -916,6 +934,31 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
       }
    }
 
+   private void parseLockCoordinator(final Element lockCoordinatorElement, 
final Configuration mainConfig) throws Exception {
+      String name = lockCoordinatorElement.getAttribute("name");
+      String lockId = getString(lockCoordinatorElement, "lock-id", name, 
NO_CHECK);
+      String className = getString(lockCoordinatorElement, "class-name", null, 
NOT_NULL_OR_EMPTY);
+      int checkPeriod = getInteger(lockCoordinatorElement, "check-period", 
LockCoordinator.DEFAULT_CHECK_PERIOD, NO_CHECK);
+
+      HashMap<String, String> properties = new HashMap<>();
+
+      if (parameterExists(lockCoordinatorElement, "properties")) {
+         final NodeList propertyNodeList = 
lockCoordinatorElement.getElementsByTagName("property");
+         final int propertiesCount = propertyNodeList.getLength();
+         properties = new HashMap<>(propertiesCount);
+         for (int i = 0; i < propertiesCount; i++) {
+            final Element propertyNode = (Element) propertyNodeList.item(i);
+            final String propertyName = 
propertyNode.getAttributeNode("key").getValue();
+            final String propertyValue = 
propertyNode.getAttributeNode("value").getValue();
+            properties.put(propertyName, propertyValue);
+         }
+      }
+
+      LockCoordinatorConfiguration lockCoordinatorConfiguration = new 
LockCoordinatorConfiguration(properties).setName(name).setLockId(lockId).setClassName(className).setCheckPeriod(checkPeriod);
+      mainConfig.addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
+   }
+
+
 
    private void parseJournalRetention(final Element e, final Configuration 
config) {
       NodeList retention = 
e.getElementsByTagName("journal-retention-directory");
@@ -1621,13 +1664,21 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
                                                                       final 
Configuration mainConfig) throws Exception {
       Node nameNode = e.getAttributes().getNamedItem("name");
 
+      String lockCoordinator = e.getAttribute("lock-coordinator");
+
+
       String name = nameNode != null ? nameNode.getNodeValue() : null;
 
       String uri = e.getChildNodes().item(0).getNodeValue();
 
       List<TransportConfiguration> configurations = 
ConfigurationUtils.parseAcceptorURI(name, uri);
+      TransportConfiguration transportConfiguration = configurations.get(0);
 
-      Map<String, Object> params = configurations.get(0).getParams();
+      Map<String, Object> params = transportConfiguration.getParams();
+
+      if (lockCoordinator != null) {
+         transportConfiguration.setLockCoordinator(lockCoordinator);
+      }
 
       if (mainConfig.isMaskPassword() != null) {
          params.put(ActiveMQDefaultConfiguration.getPropMaskPassword(), 
mainConfig.isMaskPassword());
@@ -1637,7 +1688,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          }
       }
 
-      return configurations.get(0);
+      return transportConfiguration;
    }
 
    private TransportConfiguration parseConnectorTransportConfiguration(final 
Element e,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 1a093ea558..b15b0afb7e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -85,6 +85,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
@@ -125,6 +126,8 @@ public class NettyAcceptor extends AbstractAcceptor {
       }
    }
 
+   LockCoordinator lockCoordinator;
+
    //just for debug
    private final String protocolsString;
 
@@ -441,6 +444,17 @@ public class NettyAcceptor extends AbstractAcceptor {
       }
    }
 
+   @Override
+   public LockCoordinator getLockCoordinator() {
+      return lockCoordinator;
+   }
+
+   @Override
+   public NettyAcceptor setLockCoordinator(LockCoordinator lockCoordinator) {
+      this.lockCoordinator = lockCoordinator;
+      return this;
+   }
+
    public int getTcpReceiveBufferSize() {
       return tcpReceiveBufferSize;
    }
@@ -451,6 +465,15 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    @Override
    public synchronized void start() throws Exception {
+      if (lockCoordinator == null) {
+         internalStart();
+      } else {
+         lockCoordinator.onLockAcquired(this::internalStart);
+         lockCoordinator.onLockReleased(this::internalStop);
+      }
+   }
+
+   private void internalStart() throws Exception {
       if (channelClazz != null) {
          // Already started
          return;
@@ -770,6 +793,14 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    @Override
    public void stop() throws Exception {
+      if (lockCoordinator != null) {
+         lockCoordinator.stop();
+      } else {
+         internalStop();
+      }
+   }
+
+   private void internalStop() throws Exception {
       CountDownLatch latch = new CountDownLatch(1);
       asyncStop(latch::countDown);
       latch.await();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index c4ec118c1c..932a5fb256 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -71,6 +71,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServiceRegistry;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.logs.AuditLogger;
@@ -284,6 +285,14 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
          }
 
          acceptor = factory.createAcceptor(info.getName(), clusterConnection, 
info.getParams(), new DelegatingBufferHandler(), this, threadPool, 
scheduledThreadPool, selectedProtocols, server.getThreadGroupName("remoting-" + 
info.getName()), server.getMetricsManager());
+         if (info.getLockCoordinator() != null) {
+            LockCoordinator lockCoordinator = 
server.getLockCoordinator(info.getLockCoordinator());
+            if (lockCoordinator == null) {
+               
ActiveMQServerLogger.LOGGER.lockCoordinatorNotFoundOnAcceptor(info.getLockCoordinator(),
 acceptor.getName());
+            } else {
+               acceptor.setLockCoordinator(lockCoordinator);
+            }
+         }
 
          if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) 
{
             acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d6b0662a7b..3158483591 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -56,6 +56,7 @@ import 
org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
@@ -177,6 +178,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    CriticalAnalyzer getCriticalAnalyzer();
 
+   LockCoordinator getLockCoordinator(String name);
+
    void updateStatus(String component, String statusJson);
 
    /**
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index f4ed4d905d..ec019c1ace 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1518,4 +1518,16 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224153, value = "Unable to find page {} on Address {} 
while reloading ACKNOWLEDGE_CURSOR, deleting record {}.", level = 
LogMessage.Level.INFO)
    void cannotFindPageFileDuringPageAckReload(long pageNr, Object address, 
long id);
+
+   @LogMessage(id = 224154, value = "Invalid className configured on 
LockCoordinator {}, {} does not exist", level = LogMessage.Level.WARN)
+   void invalidTypeLockCoordinator(String name, String type, Throwable t);
+
+   @LogMessage(id = 224155, value = "LockCoordinator {} not found on acceptor 
{}", level = LogMessage.Level.WARN)
+   void lockCoordinatorNotFoundOnAcceptor(String lockName, String 
acceptorName);
+
+   @LogMessage(id = 224156, value = "LockCoordinator {} starting with 
className={} and lockID={} with checkPeriod={} milliseconds", level = 
LogMessage.Level.INFO)
+   void lockCoordinatorStarting(String lockName, String className, String 
lockID, int checkPeriod);
+
+   @LogMessage(id = 224157, value = "At least one of the components failed to 
start under the lockCoordinator {}. A retry will be executed", level = 
LogMessage.Level.INFO)
+   void retryLockCoordinator(String name);
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 0d43413323..336b81f43a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -73,6 +73,7 @@ import 
org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationBrokerPlugin;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.config.impl.LegacyJMSConfiguration;
@@ -161,6 +162,7 @@ import 
org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
 import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
 import 
org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import 
org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
 import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
@@ -195,6 +197,7 @@ import 
org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
 import org.apache.activemq.artemis.core.version.Version;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
 import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -291,6 +294,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private ReplayManager replayManager;
 
+   private ConcurrentHashMap<String, LockCoordinator> lockCoordinators = new 
ConcurrentHashMap<>();
+
    /**
     * Certain management operations shouldn't use more than one thread. this 
semaphore is used to guarantee a single
     * thread used.
@@ -552,6 +557,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return this.rebuildCounters;
    }
 
+   @Override
+   public LockCoordinator getLockCoordinator(String name) {
+      return lockCoordinators.get(name);
+   }
 
    @Override
    public void replay(Date start, Date end, String address, String target, 
String filter) throws Exception {
@@ -735,6 +744,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          ActiveMQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? 
"Backup" : "Primary"), configuration);
 
+         startLockCoordinators();
+
          final boolean wasPrimary = !haPolicy.isBackup();
          if (!haPolicy.isBackup()) {
             activation = haPolicy.createActivation(this, false, 
activationParams, ioCriticalErrorListener);
@@ -793,6 +804,35 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
    }
 
+   private void startLockCoordinators() {
+      for (LockCoordinatorConfiguration lockCoordinatorConfiguration : 
configuration.getLockCoordinatorConfigurations()) {
+         String className = lockCoordinatorConfiguration.getClassName();
+         String name = lockCoordinatorConfiguration.getName();
+         String lockId = lockCoordinatorConfiguration.getLockId();
+         int checkPeriod = lockCoordinatorConfiguration.getCheckPeriod();
+
+         DistributedLockManager lockManager;
+
+         try {
+            lockManager = DistributedLockManager.newInstanceOf(className, 
lockCoordinatorConfiguration.getProperties());
+         } catch (Exception e) {
+            
ActiveMQServerLogger.LOGGER.invalidTypeLockCoordinator(lockCoordinatorConfiguration.getName(),
 className, e);
+            continue;
+         }
+
+         LockCoordinator lockCoordinator = new LockCoordinator(scheduledPool, 
executorFactory.getExecutor(), checkPeriod, lockManager, lockId, name);
+         lockCoordinators.put(name, lockCoordinator);
+         ActiveMQServerLogger.LOGGER.lockCoordinatorStarting(name, className, 
lockId, checkPeriod);
+         lockCoordinator.start();
+      }
+   }
+
+   private void stopLockCoordinators() {
+      if (lockCoordinators != null) {
+         lockCoordinators.values().forEach(LockCoordinator::stop);
+         lockCoordinators.clear();
+      }
+   }
 
    private void takingLongToStart(Object criticalComponent) {
       ActiveMQServerLogger.LOGGER.tooLongToStart(criticalComponent);
@@ -1381,6 +1421,8 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
          
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
 t);
       }
 
+      stopLockCoordinators();
+
       stopComponent(pagingManager);
 
       if (!criticalIOError && pagingManager != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
new file mode 100644
index 0000000000..b614cb6ce8
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.lock;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
+import org.apache.activemq.artemis.utils.RunnableEx;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages distributed locks a pluggable distributed lock mechanism.
+ * <p>
+ * The LockMonitor periodically attempts to acquire a distributed lock. When 
the lock
+ * is acquired, registered "acquired" callbacks are executed. When the lock is 
lost
+ * or released, "released" callbacks are executed.
+ *
+ * @see org.apache.activemq.artemis.lockmanager.DistributedLockManager
+ */
+public class LockCoordinator extends ActiveMQScheduledComponent {
+
+   /** Default period (in milliseconds) for checking lock status */
+   public static final int DEFAULT_CHECK_PERIOD = 5000;
+
+   String debugInfo;
+
+   public String getDebugInfo() {
+      return debugInfo;
+   }
+
+   public LockCoordinator setDebugInfo(String debugInfo) {
+      this.debugInfo = debugInfo;
+      return this;
+   }
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final ArrayList<RunnableEx> lockAcquiredCallback = new 
ArrayList<>();
+   private final ArrayList<RunnableEx> lockReleasedCallback = new 
ArrayList<>();
+   private final long checkPeriod;
+   private final String name;
+   private final String lockID;
+
+   DistributedLockManager lockManager;
+   DistributedLock distributedLock;
+   volatile boolean locked;
+
+   public DistributedLockManager getLockManager() {
+      return lockManager;
+   }
+
+   /**
+    * Registers a callback to be executed when lock is acquired.
+    * If the lock is already held when this method is called, the callback
+    * will be executed immediately (on the executor thread).
+    *
+    * Also In case the runnable throws any exceptions, the lock will be 
released, any previously added callback will be called for stop
+    * and the monitor will retry the locks
+    *
+    * @param runnable the callback to execute when lock is acquired
+    */
+   public void onLockAcquired(RunnableEx runnable) {
+      this.lockAcquiredCallback.add(runnable);
+      // if it's locked we run the runnable being added,
+      // however we must check this inside the executor
+      // or within a global locking
+      executor.execute(() -> runIfLocked(runnable));
+   }
+
+   /**
+    * Registers a callback to be executed when lock is released or lost.
+    *
+    * @param runnable the callback to execute when lock is released
+    */
+   public void onLockReleased(RunnableEx runnable) {
+      this.lockReleasedCallback.add(runnable);
+   }
+
+   /**
+    * Stops the lock coordinator, releasing any held locks and cleaning up 
resources.
+    * This method blocks until all cleanup is complete.
+    */
+   @Override
+   public void stop() {
+      super.stop();
+      SimpleFutureImpl<Void> simpleFuture = new SimpleFutureImpl<>();
+      executor.execute(() -> {
+         if (locked) {
+            fireLockChanged(false);
+         }
+         if (distributedLock != null) {
+            try {
+               distributedLock.unlock();
+            } catch (Exception e) {
+               logger.debug("Error unlocking during stop", e);
+            }
+            try {
+               distributedLock.close();
+            } catch (Exception e) {
+               logger.debug("Error closing lock during stop", e);
+            }
+            distributedLock = null;
+         }
+         if (lockManager != null) {
+            try {
+               lockManager.stop();
+            } catch (Exception e) {
+               logger.debug("Error stopping lock manager during stop", e);
+            }
+            lockManager = null;
+         }
+         simpleFuture.set(null);
+      });
+      try {
+         simpleFuture.get();
+      } catch (Exception e) {
+         logger.debug("Error waiting for stop to complete", e);
+      }
+   }
+
+   /**
+    * Returns whether this instance currently holds the lock.
+    *
+    * @return true if lock is currently held, false otherwise
+    */
+   public boolean isLocked() {
+      return locked;
+   }
+
+   /**
+    * Constructs a new LockCoordinator.
+    *
+    * @param scheduledExecutor the executor for scheduling periodic lock checks
+    * @param executor the executor for running callbacks
+    * @param checkPeriod how often to check lock status (in milliseconds)
+    * @param lockManager the distributed lock manager implementation to use
+    * @param lockID the unique identifier for the lock
+    * @param name a descriptive name for this lock coordinator
+    */
+   public LockCoordinator(ScheduledExecutorService scheduledExecutor, Executor 
executor, long checkPeriod, DistributedLockManager lockManager, String lockID, 
String name) {
+      super(scheduledExecutor, executor, checkPeriod, checkPeriod, 
TimeUnit.MILLISECONDS, false);
+      assert executor != null;
+      this.lockManager = lockManager;
+      this.checkPeriod = checkPeriod;
+      this.lockID = lockID;
+      this.name = name;
+   }
+
+   private void fireLockChanged(boolean locked) {
+      this.locked = locked;
+      if (locked) {
+         AtomicBoolean treatErrors = new AtomicBoolean(false);
+         lockAcquiredCallback.forEach(r -> doRunTreatingErrors(r, 
treatErrors));
+         if (treatErrors.get()) {
+            retryLock();
+         }
+      } else {
+         lockReleasedCallback.forEach(this::doRunWithLogException);
+      }
+   }
+
+   private void retryLock() {
+      ActiveMQServerLogger.LOGGER.retryLockCoordinator(name);
+      // Release lock and retry on next scheduled run if callbacks failed
+      executor.execute(this::executeRetryLock);
+   }
+
+   // to be used as a runnable on the executor
+   private void executeRetryLock() {
+      if (locked) {
+         logger.debug("Unlocking to retry the callback");
+         fireLockChanged(false);
+         if (distributedLock != null) {
+            try {
+               distributedLock.unlock();
+               distributedLock.close();
+            } catch (Exception e) {
+               logger.debug(e.getMessage(), e);
+            }
+            distributedLock = null;
+         }
+         if (lockManager != null) {
+            try {
+               lockManager.stop();
+            } catch (Exception e) {
+               logger.debug(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+   private void runIfLocked(RunnableEx checkBeingAdded) {
+      if (locked) {
+         try {
+            doRun(checkBeingAdded);
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            retryLock();
+         }
+      }
+   }
+
+   private void doRunTreatingErrors(RunnableEx r, AtomicBoolean errorOnStart) {
+      try {
+         r.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         errorOnStart.set(true);
+      }
+   }
+
+   private void doRun(RunnableEx r) throws Exception  {
+      r.run();
+   }
+
+   private void doRunWithLogException(RunnableEx r) {
+      try {
+         r.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public void run() {
+      try {
+         if (!locked) {
+            if (!lockManager.isStarted()) {
+               lockManager.start();
+            }
+            DistributedLock lock = lockManager.getDistributedLock(lockID);
+            if (lock.tryLock(1, TimeUnit.SECONDS)) {
+               logger.debug("Succeeded on locking {}, lockID={}", name, 
lockID);
+               this.distributedLock = lock;
+               fireLockChanged(true);
+            } else {
+               logger.debug("Not able to lock {}, lockID={}", name, lockID);
+               lock.close();
+               lockManager.stop();
+            }
+         } else {
+            if (!distributedLock.isHeldByCaller()) {
+               fireLockChanged(false);
+               distributedLock.close();
+               distributedLock = null;
+               lockManager.stop();
+            }
+         }
+      } catch (Exception e) {
+         fireLockChanged(false);
+         if (distributedLock != null) {
+            try {
+               distributedLock.close();
+            } catch (Exception closeEx) {
+               logger.debug("Error closing lock", closeEx);
+            }
+            distributedLock = null;
+         }
+         if (lockManager != null) {
+            try {
+               lockManager.stop();
+            } catch (Exception stopEx) {
+               logger.debug("Error stopping lock manager", stopEx);
+            }
+         }
+         logger.warn(e.getMessage(), e);
+      }
+   }
+}
+
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
index 5914c8c59e..baa6fd7b22 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
@@ -24,6 +24,7 @@ import 
org.apache.activemq.artemis.core.protocol.ProtocolHandler;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 
 import static 
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_AUTO_START;
@@ -39,6 +40,14 @@ public interface Acceptor extends ActiveMQComponent {
     */
    String getName();
 
+   default Acceptor setLockCoordinator(LockCoordinator lockCoordinator) {
+      return this;
+   }
+
+   default LockCoordinator getLockCoordinator() {
+      return null;
+   }
+
    /**
     * Pause the acceptor and stop it from receiving client requests.
     */
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0cf7e94fa3..5c12855f29 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -551,6 +551,8 @@
             </xsd:complexType>
          </xsd:element>
 
+         <xsd:element ref="lock-coordinators" maxOccurs="1" minOccurs="0"/>
+
          <xsd:element ref="bridges" maxOccurs="1" minOccurs="0"/>
 
          <xsd:element ref="federations" maxOccurs="1" minOccurs="0"/>
@@ -3224,6 +3226,78 @@
       </xsd:complexType>
    </xsd:element>
 
+
+   <xsd:complexType name="lock-coordinatorType">
+      <xsd:all>
+         <xsd:element name="lock-id" type="xsd:string" maxOccurs="1" 
minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The lockID the implementation will use to reach the lock 
provider.
+                  This is different from the lock-coordinator name, but if 
lock-id is omitted, we will use name of the lock-coordinator as a value.
+                  Notice this feature is in tech preview and its configuration 
is subjected to changes.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="class-name" type="xsd:string" maxOccurs="1" 
minOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The className of lockManager being used for coordination.
+                  Notice this is a pluggable functionality so a provider may 
introduce additional options.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="check-period" type="xsd:integer" maxOccurs="1" 
minOccurs="0" default="5000">
+            <xsd:annotation>
+               <xsd:documentation>
+                  A period used to verify if the lock still valid and renew it 
if needed.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="properties" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  A list of options for the distributed-primitive-manager
+               </xsd:documentation>
+            </xsd:annotation>
+            <xsd:complexType>
+               <xsd:sequence>
+                  <xsd:element name="property" type="propertyType" 
minOccurs="1" maxOccurs="unbounded">
+                     <xsd:annotation>
+                        <xsd:documentation>
+                           A key-value pair option for the 
distributed-primitive-manager
+                        </xsd:documentation>
+                     </xsd:annotation>
+                  </xsd:element>
+               </xsd:sequence>
+            </xsd:complexType>
+         </xsd:element>
+      </xsd:all>
+
+      <xsd:attribute name="name" type="xsd:ID" use="required">
+         <xsd:annotation>
+            <xsd:documentation>
+               unique name for the lock coordinator
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
+
+   </xsd:complexType>
+
+   <xsd:element name="lock-coordinators">
+      <xsd:annotation>
+         <xsd:documentation>
+            a list of lock coordinators
+         </xsd:documentation>
+      </xsd:annotation>
+      <xsd:complexType>
+         <xsd:sequence>
+            <xsd:element name="lock-coordinator" type="lock-coordinatorType" 
maxOccurs="unbounded" minOccurs="0"/>
+         </xsd:sequence>
+         <xsd:attributeGroup ref="xml:specialAttrs"/>
+      </xsd:complexType>
+   </xsd:element>
+
+
    <xsd:complexType name="distributed-primitive-manager">
       <xsd:all>
          <xsd:element name="class-name" type="xsd:string" maxOccurs="1" 
minOccurs="0">
@@ -4901,6 +4975,8 @@
          <xsd:extension base="xsd:string">
             <xsd:attribute name="name" type="xsd:string">
             </xsd:attribute>
+            <xsd:attribute name="lock-coordinator" type="xsd:string">
+            </xsd:attribute>
          </xsd:extension>
       </xsd:simpleContent>
    </xsd:complexType>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 8987868d02..7bf060f156 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.beans.PropertyDescriptor;
+import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.PrintWriter;
 import java.io.StringReader;
@@ -62,6 +64,7 @@ import 
org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ConfigurationUtils;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeAddressPolicyElement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeBrokerConnectionElement;
@@ -3044,6 +3047,79 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertDoesNotThrow(() -> configuration.exportAsProperties(fileOutput));
    }
 
+   /**
+    * Verifies the lock coordinator configuration parsing and export process:
+    * <ul>
+    * <li>Creates a configuration from broker properties</li>
+    * <li>Validates the configuration output</li>
+    * <li>Exports the configuration back to a new {@link 
java.util.Properties}</li>
+    * <li>Verifies that the new output contains all initially specified 
properties</li>
+    * </ul>
+    */
+   @Test
+   public void testParseLockCoordinator() throws Exception {
+      Properties properties = new Properties();
+
+      properties.put("lockCoordinatorConfigurations.hello.checkPeriod", "123");
+      properties.put("lockCoordinatorConfigurations.hello.lockId", "lock-id");
+      properties.put("lockCoordinatorConfigurations.hello.name", "hello");
+      properties.put("lockCoordinatorConfigurations.hello.className", 
"some.class.somewhere");
+      for (int i = 0; i < 10; i++) {
+         properties.put("lockCoordinatorConfigurations.hello.properties.k" + 
i, "v" + i);
+      }
+
+      properties.put("acceptorConfigurations.netty.factoryClassName", "netty");
+      properties.put("acceptorConfigurations.netty.lockCoordinator", "hello");
+      properties.put("acceptorConfigurations.netty.name", "netty");
+      properties.put("acceptorConfigurations.netty.params.port", "8888");
+      properties.put("acceptorConfigurations.netty.params.host", "localhost");
+
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.parsePrefixedProperties(properties, null);
+
+      assertEquals(1, configuration.getAcceptorConfigurations().size());
+      TransportConfiguration acceptorConfig = null;
+      for (TransportConfiguration t 
:configuration.getAcceptorConfigurations()) {
+         acceptorConfig = t;
+      }
+      // I am not going to validate all the parameters from netty since this 
is already tested elsewhere
+      assertEquals("hello", acceptorConfig.getLockCoordinator());
+      assertEquals("netty", acceptorConfig.getFactoryClassName());
+
+      assertEquals(1, configuration.getLockCoordinatorConfigurations().size());
+
+      LockCoordinatorConfiguration lockCoordinatorConfiguration = null;
+
+      for (LockCoordinatorConfiguration t : 
configuration.getLockCoordinatorConfigurations()) {
+         lockCoordinatorConfiguration = t;
+      }
+      Map<String, String> lockProperties = 
lockCoordinatorConfiguration.getProperties();
+      for (int i = 0; i < 10; i++) {
+         assertEquals("v" + i, lockProperties.get("k" + i));
+      }
+
+      assertEquals(123, lockCoordinatorConfiguration.getCheckPeriod());
+      assertEquals("lock-id", lockCoordinatorConfiguration.getLockId());
+      assertEquals("hello", lockCoordinatorConfiguration.getName());
+      assertEquals("some.class.somewhere", 
lockCoordinatorConfiguration.getClassName());
+
+      File outputProperty = new File(getTestDirfile(), "broker.properties");
+      configuration.exportAsProperties(outputProperty);
+
+      Properties brokerProperties = new Properties();
+
+      try (FileInputStream is = new FileInputStream(outputProperty)) {
+         BufferedInputStream bis = new BufferedInputStream(is);
+         brokerProperties.load(bis);
+      }
+
+      properties.forEach((k, v) -> {
+         logger.debug("Validating {} = {}", k, v);
+         assertEquals(v, brokerProperties.get(k));
+      });
+
+   }
+
    /**
     * To test ARTEMIS-926
     */
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 53b07421d7..fdca23ef83 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -26,7 +26,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@@ -55,10 +58,14 @@ import 
org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
 import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
 import org.apache.activemq.artemis.utils.StringPrintStream;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXParseException;
 
 public class FileConfigurationParserTest extends ServerTestBase {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private static final String PURGE_FOLDER_FALSE = """
       <configuration>
          <core>
@@ -90,6 +97,7 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
             <acceptor name="netty">tcp://localhost:5545</acceptor>
             <acceptor name="netty-throughput">tcp://localhost:5545</acceptor>
             <acceptor name="in-vm">vm://0</acceptor>
+            <acceptor name="netty-with-lock" 
lock-coordinator="my-lock">tcp://localhost:5545</acceptor>
          </acceptors>
          <security-settings>
             <security-setting match="#">
@@ -130,6 +138,21 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
          </bridge>
       </bridges>""";
 
+
+   private static final String LOCK_COORDINATOR_PART = """
+      <lock-coordinators>
+         <lock-coordinator name="my-lock">
+            <lock-id>sausage-factory</lock-id>
+            <class-name>some.class.somewhere</class-name>
+            <check-period>333</check-period>
+            <properties>
+               <property key='test1' value='value1'/>
+               <property key='test2' value='value2'/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>""";
+
+
    /**
     * These "InvalidConfigurationTest*.xml" files are modified copies of 
{@literal ConfigurationTest-full-config.xml},
     * so just diff it for changes, e.g.
@@ -423,6 +446,27 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
       assertEquals("helloworld", bconfig.getPassword());
    }
 
+   @Test
+   public void testLockCoordinatorParse() throws Exception {
+      FileConfigurationParser parser = new FileConfigurationParser();
+      String configStr = FIRST_PART + LOCK_COORDINATOR_PART + LAST_PART;
+      Configuration configuration = parser.parseMainConfig(new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)));
+
+      Collection<LockCoordinatorConfiguration> lockConfigurations = 
configuration.getLockCoordinatorConfigurations();
+      lockConfigurations.forEach(f -> logger.info("lockConfiguration={}", f));
+      assertEquals(1, lockConfigurations.size());
+      for (LockCoordinatorConfiguration lockConfiguration : 
lockConfigurations) {
+         assertEquals("my-lock", lockConfiguration.getName());
+         assertEquals("sausage-factory", lockConfiguration.getLockId());
+         assertEquals("some.class.somewhere", 
lockConfiguration.getClassName());
+         Map<String, String> properties = lockConfiguration.getProperties();
+         assertEquals(2, properties.size());
+         assertEquals("value1", properties.get("test1"));
+         assertEquals("value2", properties.get("test2"));
+      }
+      configuration.getAcceptorConfigurations().stream().filter(f -> 
f.getName().equals("netty-with-lock")).forEach(f -> assertEquals("my-lock", 
f.getLockCoordinator()));
+   }
+
    @Test
    public void testDefaultBridgeProducerWindowSize() throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
diff --git a/docs/user-manual/_book.adoc b/docs/user-manual/_book.adoc
index d32db5faa8..bf048d4c29 100644
--- a/docs/user-manual/_book.adoc
+++ b/docs/user-manual/_book.adoc
@@ -65,6 +65,7 @@ include::network-isolation.adoc[leveloffset=1]
 include::restart-sequence.adoc[leveloffset=1]
 include::activation-tools.adoc[leveloffset=1]
 include::amqp-broker-connections.adoc[leveloffset=1]
+include::lock-coordination.adoc[leveloffset=1]
 include::federation.adoc[leveloffset=1]
 include::federation-address.adoc[leveloffset=1]
 include::federation-queue.adoc[leveloffset=1]
diff --git a/docs/user-manual/_diagrams/lock-coordination-example.odg 
b/docs/user-manual/_diagrams/lock-coordination-example.odg
new file mode 100644
index 0000000000..d82f5b5c72
Binary files /dev/null and 
b/docs/user-manual/_diagrams/lock-coordination-example.odg differ
diff --git a/docs/user-manual/images/lock-coordination-example.png 
b/docs/user-manual/images/lock-coordination-example.png
new file mode 100644
index 0000000000..752c2909d3
Binary files /dev/null and 
b/docs/user-manual/images/lock-coordination-example.png differ
diff --git a/docs/user-manual/lock-coordination.adoc 
b/docs/user-manual/lock-coordination.adoc
new file mode 100644
index 0000000000..c5ca2c7466
--- /dev/null
+++ b/docs/user-manual/lock-coordination.adoc
@@ -0,0 +1,164 @@
+= Lock Coordination
+:idprefix:
+:idseparator: -
+:docinfo: shared
+
+The Lock Coordinator provides pluggable distributed lock mechanism monitoring.
+It allows multiple broker instances to coordinate the activation of specific 
configuration elements, ensuring that only one broker instance activates a 
particular element at any given time.
+
+When a broker acquires a lock through a distributed lock, the associated 
configuration elements are activated.
+If the lock is lost or released, those elements are deactivated.
+
+In the current version, the Lock Coordinator can be applied to control the 
startup and shutdown of acceptors.
+When an acceptor is associated with a lock coordinator, it will only start 
accepting connections when the broker successfully acquires the distributed 
lock.
+If lock is lost for any reason, the acceptor automatically stops accepting new 
connections.
+
+The same pattern used on acceptors may eventually be applied to other 
configuration elements.
+If you have ideas for additional use cases where this pattern could be 
applied, please file a JIRA issue.
+
+WARNING: This feature is in technical preview and its configuration elements 
are subject to possible modifications.
+
+== Configuration
+
+It is possible to specify multiple lock-coordinators and associate them with 
other broker elements.
+
+The broker element associated with a lock-coordinator (e.g., an acceptor) will 
only be started if the distributed lock can be acquired.
+If the lock cannot be acquired or is lost, the associated element will be 
stopped.
+
+This pattern can be used to ensure clients connect to only one of your 
mirrored brokers at a time, preventing split-brain scenarios and duplicate 
message processing.
+
+Depending on the provider selector, multiple configuration options can be 
provided.
+Please consult the javadoc for your lock implementation.
+A simple table will be provided in this chapter for the two reference 
implementations we provide, but this could be a plugin being added to your 
broker.
+
+In this next example, we configure a broker with:
+
+* Two acceptors: one for mirroring traffic (`for-mirroring-only`) and one for 
client connections (`for-clients-only`)
+* A File-based lock-coordinator named `clients-lock`
+* The client acceptor associated with the lock-coordinator, so it only 
activates when the distributed lock is acquired
+* A mirror connection to another broker for data replication
+
+[,xml]
+----
+<acceptors>
+   <acceptor 
name="for-mirroring-only">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+   <acceptor name="for-clients-only" 
lock-coordinator="clients-lock">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+</acceptors>
+
+<lock-coordinators>
+   <lock-coordinator name="clients-lock">
+      
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+      <lock-id>mirror-cluster-clients</lock-id>
+      <check-period>1000</check-period> <!-- how often to check if the lock is 
still valid, in milliseconds -->
+
+      <properties>
+         <property key="locks-folder" value="/usr/somewhere/existing-folder"/>
+      </properties>
+   </lock-coordinator>
+</lock-coordinators>
+
+<broker-connections>
+   <amqp-connection uri="tcp://otherBroker:61000" name="mirror" 
retry-interval="2000">
+      <mirror sync="false"/>
+   </amqp-connection>
+</broker-connections>
+
+
+----
+
+In the previous configuration, the broker will use a file lock, and the 
acceptor will only be active if it can hold the distributed lock between the 
mirrored brokers.
+
+image:images/lock-coordination-example.png[HA with mirroring]
+
+You can find a 
https://github.com/apache/artemis-examples/tree/main/examples/features/broker-connection/ha-with-mirroring[working
 example] on how to run HA with Mirroring.
+
+== Configuration Options
+
+=== Common Configuration
+
+The following elements are configured on lock-coordinator
+
+[cols="1,1,1,3"]
+|===
+|Element |Required |Default |Description
+
+|name
+|Yes
+|None
+|Unique identifier for this lock-coordinator instance, used to reference it 
from other configuration elements
+
+|class-name
+|Yes
+|None
+|The lock provider implementation (e.g., 
`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager` or 
`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`)
+
+|lock-id
+|Yes
+|None
+|Unique identifier for the distributed lock. All brokers competing for the 
same distributed lock must use the same lock-id
+
+|check-period
+|No
+|5000
+|How often to check if the lock is still valid, in milliseconds
+|===
+
+=== File
+
+The file-based lock uses the file system to manage distributed locks.
+It is provided by the 
class-name=`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+
+[cols="1,1,1,3"]
+|===
+|Property |Required |Default |Description
+
+|locks-folder
+|Yes
+|None
+|Path to the directory where lock files will be created and managed. The 
directory must be created in advance before using this lock.
+|===
+
+=== ZooKeeper
+
+The ZooKeeper-based lock uses Apache Curator to manage distributed locks via 
ZooKeeper.
+It is provided by the 
class-name=`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
+
+[cols="1,1,1,3"]
+|===
+|Property |Required |Default |Description
+
+|connect-string
+|Yes
+|None
+|ZooKeeper connection string (e.g., "localhost:2181" or 
"host1:2181,host2:2181,host3:2181")
+
+|namespace
+|Yes
+|None
+|Namespace prefix for all ZooKeeper paths to isolate data
+
+|session-ms
+|No
+|18000
+|Session timeout in milliseconds
+
+|session-percent
+|No
+|33
+|Percentage of session timeout to use for lock operations
+
+|connection-ms
+|No
+|8000
+|Connection timeout in milliseconds
+
+|retries
+|No
+|1
+|Number of retry attempts for failed operations
+
+|retries-ms
+|No
+|1000
+|Delay in milliseconds between retry attempts
+|===
diff --git a/docs/user-manual/restart-sequence.adoc 
b/docs/user-manual/restart-sequence.adoc
index fa535aee76..dc1f90e8d7 100644
--- a/docs/user-manual/restart-sequence.adoc
+++ b/docs/user-manual/restart-sequence.adoc
@@ -1,11 +1,15 @@
-= Restart Sequence
+= Restart Sequence if using Journal replication
 :idprefix:
 :idseparator: -
 :docinfo: shared
 
-{project-name-full} ships with 2 architectures for providing HA features.
-The primary and backup brokers can be configured either using network 
replication or using shared storage.
-This document will share restart sequences for the brokers under various 
circumstances when the client applications are  connected to it.
+{project-name-full} ships with 3 possibilities for providing HA:
+
+- Shared storage
+- Network Journal Replication
+- AMQP Broker Connection Mirroring
+
+This page will cover steps to restart the broker while using journal 
replication.
 
 == Restarting 1 broker at a time
 
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
new file mode 100644
index 0000000000..4d3edf8ce8
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
@@ -0,0 +1,205 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="internal">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="artemis" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <properties>
+               <property key="connect-string" value="localhost:2181"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61001" name="mirror" 
retry-interval="2000">
+            <mirror sync="false"/>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
new file mode 100644
index 0000000000..918c417197
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
@@ -0,0 +1,205 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="internal">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="artemis" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <properties>
+               <property key="connect-string" value="localhost:2181"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirror" 
retry-interval="2000">
+            <mirror sync="false"/>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
new file mode 100644
index 0000000000..f5f50f7836
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
@@ -0,0 +1,186 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+         <acceptor 
name="internal">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <!-- acceptor artemis will be created from broker properties -->
+      </acceptors>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61001" name="mirror" 
retry-interval="2000">
+            <mirror sync="false"/>
+         </amqp-connection>
+      </broker-connections>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
new file mode 100644
index 0000000000..8b9b054733
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
@@ -0,0 +1,187 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+         <acceptor 
name="internal">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <!-- acceptor artemis will be created from broker properties -->
+      </acceptors>
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirror" 
retry-interval="2000">
+            <mirror sync="false"/>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
new file mode 100644
index 0000000000..750ad87ebd
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final String SERVER_NAME_WITH_ZK_A = 
"lockmanager/dualMirrorSingleAcceptor/ZK/A";
+   public static final String SERVER_NAME_WITH_ZK_B = 
"lockmanager/dualMirrorSingleAcceptor/ZK/B";
+
+   public static final String SERVER_NAME_WITH_FILE_A = 
"lockmanager/dualMirrorSingleAcceptor/file/A";
+   public static final String SERVER_NAME_WITH_FILE_B = 
"lockmanager/dualMirrorSingleAcceptor/file/B";
+
+   // Test constants
+   private static final int ALTERNATING_TEST_ITERATIONS = 2;
+   private static final int MESSAGES_SENT_PER_ITERATION = 100;
+   private static final int MESSAGES_CONSUMED_PER_ITERATION = 17;
+   private static final int MESSAGES_REMAINING_PER_ITERATION = 
MESSAGES_SENT_PER_ITERATION - MESSAGES_CONSUMED_PER_ITERATION;
+   private static final int EXPECTED_FINAL_MESSAGE_COUNT = 
ALTERNATING_TEST_ITERATIONS * MESSAGES_REMAINING_PER_ITERATION;
+
+   private static final int ZK_BASE_PORT = 2181;
+
+   Process processA;
+   Process processB;
+
+   private static void customizeFileServer(File serverLocation, File fileLock) 
{
+      try {
+         FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), 
"CHANGEME", fileLock.getAbsolutePath());
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   private static void createServerPair(String serverNameA, String serverNameB,
+                                         String configPathA, String 
configPathB,
+                                         Consumer<File> customizeServer) 
throws Exception {
+      File serverLocationA = getFileServerLocation(serverNameA);
+      File serverLocationB = getFileServerLocation(serverNameB);
+      deleteDirectory(serverLocationB);
+      deleteDirectory(serverLocationA);
+
+      createSingleServer(serverLocationA, configPathA, "A", customizeServer);
+      createSingleServer(serverLocationB, configPathB, "B", customizeServer);
+   }
+
+   private static void createSingleServer(File serverLocation, String 
configPath,
+                                           String userAndPassword, 
Consumer<File> customizeServer) throws Exception {
+      HelperCreate cliCreateServer = helperCreate();
+      cliCreateServer.setAllowAnonymous(true)
+                     .setUser(userAndPassword)
+                     .setPassword(userAndPassword)
+                     .setNoWeb(true)
+                     .setConfiguration(configPath)
+                     .setArtemisInstance(serverLocation);
+      cliCreateServer.createServer();
+
+      if (customizeServer != null) {
+         customizeServer.accept(serverLocation);
+      }
+   }
+
+   @BeforeEach
+   public void prepareServers() throws Exception {
+
+   }
+
+   @Test
+   public void testAlternatingZK() throws Throwable {
+      {
+         createServerPair(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B,
+                          
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A",
+                          
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B",
+                          null);
+
+         cleanupData(SERVER_NAME_WITH_ZK_A);
+         cleanupData(SERVER_NAME_WITH_ZK_B);
+      }
+
+      // starting zookeeper
+      ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, 
ZK_BASE_PORT, 100);
+      zkCluster.start();
+      runAfter(zkCluster::stop);
+
+      testAlternating(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, null, 
null);
+   }
+
+   @Test
+   public void testAlternatingFile() throws Throwable {
+      File fileLock = new File("./target/serverLock");
+      fileLock.mkdirs();
+
+      {
+         createServerPair(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B,
+                          
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A",
+                          
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B",
+                          s -> customizeFileServer(s, fileLock));
+
+         cleanupData(SERVER_NAME_WITH_FILE_A);
+         cleanupData(SERVER_NAME_WITH_FILE_B);
+      }
+
+      Properties properties = new Properties();
+
+      properties.put("acceptorConfigurations.artemis.extraParams.amqpCredits", 
"1000");
+      
properties.put("acceptorConfigurations.artemis.extraParams.amqpLowCredits", 
"300");
+      properties.put("acceptorConfigurations.artemis.factoryClassName", 
"org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory");
+      properties.put("acceptorConfigurations.artemis.lockCoordinator", 
"failover");
+      properties.put("acceptorConfigurations.artemis.name", "artemis");
+      properties.put("acceptorConfigurations.artemis.params.scheme", "tcp");
+      
properties.put("acceptorConfigurations.artemis.params.tcpReceiveBufferSize", 
"1048576");
+      properties.put("acceptorConfigurations.artemis.params.port", "61616");
+      properties.put("acceptorConfigurations.artemis.params.host", 
"localhost");
+      properties.put("acceptorConfigurations.artemis.params.protocols", 
"CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE");
+      properties.put("acceptorConfigurations.artemis.params.useEpoll", "true");
+      
properties.put("acceptorConfigurations.artemis.params.tcpSendBufferSize", 
"1048576");
+
+      properties.put("lockCoordinatorConfigurations.failover.checkPeriod", 
"5000");
+      properties.put("lockCoordinatorConfigurations.failover.className", 
"org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager");
+      properties.put("lockCoordinatorConfigurations.failover.lockId", "fail");
+      properties.put("lockCoordinatorConfigurations.failover.name", 
"failover");
+      
properties.put("lockCoordinatorConfigurations.failover.properties.locks-folder",
 fileLock.getAbsolutePath());
+
+      try (FileOutputStream fileOutputStream = new FileOutputStream(new 
File(getServerLocation(SERVER_NAME_WITH_FILE_A), "broker.properties"))) {
+         properties.store(fileOutputStream, null);
+      }
+
+      try (FileOutputStream fileOutputStream = new FileOutputStream(new 
File(getServerLocation(SERVER_NAME_WITH_FILE_B), "broker.properties"))) {
+         properties.store(fileOutputStream, null);
+      }
+
+         // I'm using broker properties in one of the tests, to help 
validating it
+      File propertiesA = new File(getServerLocation(SERVER_NAME_WITH_FILE_A), 
"broker.properties");
+      File propertiesB = new File(getServerLocation(SERVER_NAME_WITH_FILE_B), 
"broker.properties");
+
+      testAlternating(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B, 
propertiesA, propertiesB);
+   }
+
+   public void testAlternating(String nameServerA, String nameServerB, File 
brokerPropertiesA, File brokerPropertiesB) throws Throwable {
+      processA = startServer(nameServerA, 0, -1, brokerPropertiesA);
+      waitForXToStart();
+      processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
+      ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp", 
"tcp://localhost:61616");
+
+      for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) {
+         logger.info("Iteration {}: Server {} active", i, (i % 2 == 0) ? "A" : 
"B");
+
+         if (i % 2 == 0) {
+            // Even iteration: Server A active, kill Server B
+            killServer(processB);
+            waitForXToStart();
+         } else {
+            // Odd iteration: Server B active, kill Server A
+            killServer(processA);
+            waitForXToStart();
+         }
+
+         // Send messages through the shared acceptor
+         cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
+         sendMessages(cfX, MESSAGES_SENT_PER_ITERATION);
+
+         // Consume some messages
+         receiveMessages(cfX, MESSAGES_CONSUMED_PER_ITERATION);
+
+         // Restart the killed server
+         if (i % 2 == 0) {
+            processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
+         } else {
+            processA = startServer(nameServerA, 0, -1, brokerPropertiesA);
+         }
+      }
+
+      // Verify they both have the expected message count (iterations × (sent 
- consumed))
+      assertMessageCount("tcp://localhost:61000", "myQueue", 
EXPECTED_FINAL_MESSAGE_COUNT);
+      assertMessageCount("tcp://localhost:61001", "myQueue", 
EXPECTED_FINAL_MESSAGE_COUNT);
+   }
+
+   private static void sendMessages(ConnectionFactory cfX, int nmessages) 
throws JMSException {
+      try (Connection connectionX = cfX.createConnection("A", "A")) {
+         Session sessionX = connectionX.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = sessionX.createQueue("myQueue");
+         MessageProducer producerX = sessionX.createProducer(queue);
+         for (int i = 0; i < nmessages; i++) {
+            producerX.send(sessionX.createTextMessage("hello " + i));
+         }
+         sessionX.commit();
+      }
+   }
+
+   private static void receiveMessages(ConnectionFactory cfX, int nmessages) 
throws JMSException {
+      try (Connection connectionX = cfX.createConnection("A", "A")) {
+         connectionX.start();
+         Session sessionX = connectionX.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = sessionX.createQueue("myQueue");
+         MessageConsumer consumerX = sessionX.createConsumer(queue);
+         for (int i = 0; i < nmessages; i++) {
+            TextMessage message = (TextMessage) consumerX.receive(5000);
+            assertNotNull(message, "Expected message " + i + " but got null");
+         }
+         sessionX.commit();
+      }
+   }
+
+   private void waitForXToStart() {
+      for (int i = 0; i < 20; i++) {
+         try {
+            ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:61616");
+            Connection connection = factory.createConnection();
+            connection.close();
+            return;
+         } catch (Throwable e) {
+            logger.debug(e.getMessage(), e);
+            try {
+               Thread.sleep(500);
+            } catch (Throwable ignored) {
+            }
+         }
+      }
+   }
+
+   protected void assertMessageCount(String uri, String queueName, int count) 
throws Exception {
+      SimpleManagement simpleManagement = new SimpleManagement(uri, null, 
null);
+      Wait.assertEquals(count, () -> {
+         try {
+            return simpleManagement.getMessageCountOnQueue(queueName);
+         } catch (Throwable e) {
+            return -1;
+         }
+      });
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
new file mode 100644
index 0000000000..2af144d8fb
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager;
+import 
org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * This test needs external dependencies. It follows the same pattern 
described at {@link DualMirrorSingleAcceptorRunningTest}.
+ * please read the documentation from that test for more detail on how to run 
this test.
+ */
+public class LockCoordinatorTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int ZK_BASE_PORT = 2181;
+   private static final String ZK_ENDPOINTS = "127.0.0.1:2181";
+   private static final long KEEP_ALIVE_INTERVAL_MS = 200;
+   private static final int NUM_THREADS = 10;
+
+   private ExecutorService executorService;
+   private ScheduledExecutorService scheduledExecutor;
+   private AtomicInteger lockHolderCount;
+   private AtomicInteger lockChanged;
+   private OrderedExecutorFactory executorFactory;
+
+   @BeforeEach
+   @Override
+   public void setUp() {
+      disableCheckThread();
+      scheduledExecutor = Executors.newScheduledThreadPool(NUM_THREADS);
+      executorService = Executors.newFixedThreadPool(NUM_THREADS * 2);
+      executorFactory = new OrderedExecutorFactory(executorService);
+      lockHolderCount = new AtomicInteger(0);
+      lockChanged = new AtomicInteger(0);
+   }
+
+   @AfterEach
+   @Override
+   public void tearDown() {
+      scheduledExecutor.shutdownNow();
+      executorService.shutdownNow();
+   }
+
+   @Test
+   public void testWithFile() throws Exception {
+      internalTest(i -> getFileCoordinators(i));
+   }
+
+   @Test
+   public void testWithZK() throws Exception {
+      ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, 
ZK_BASE_PORT, 100);
+      zkCluster.start();
+      runAfter(zkCluster::stop);
+      assertEquals(ZK_ENDPOINTS, zkCluster.getConnectString());
+      internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString()));
+   }
+
+   private void internalTest(Function<Integer, List<LockCoordinator>> 
lockCoordinatorSupplier) throws Exception {
+      testOnlyOneLockHolderAtATime(lockCoordinatorSupplier.apply(NUM_THREADS));
+      testAddAfterLocked(lockCoordinatorSupplier.apply(1).get(0));
+      testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0));
+      testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0));
+
+      {
+         List<LockCoordinator> list = lockCoordinatorSupplier.apply(2);
+         testNoRetryWhileNotAcquired(list.get(0), list.get(1));
+      }
+   }
+
+   private void testAddAfterLocked(LockCoordinator lockCoordinator) throws 
Exception {
+      lockHolderCount.set(0);
+      lockChanged.set(0);
+
+      try {
+         lockCoordinator.start();
+         Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+         AtomicInteger afterRunning = new AtomicInteger(0);
+         assertTrue(lockCoordinator.isLocked());
+         lockCoordinator.onLockAcquired(afterRunning::incrementAndGet);
+
+         Wait.assertEquals(1, afterRunning::get);
+
+         assertEquals(1, lockHolderCount.get());
+      } finally {
+         lockCoordinator.stop();
+      }
+   }
+
+   private void testRetryAfterError(LockCoordinator lockCoordinator) throws 
Exception {
+      lockHolderCount.set(0);
+      lockChanged.set(0);
+
+      AtomicBoolean succeeded = new AtomicBoolean(false);
+      AtomicInteger numberOfTries = new AtomicInteger(0);
+      try {
+         lockCoordinator.onLockAcquired(() -> {
+            if (numberOfTries.incrementAndGet() < 5) {
+               throw new IOException("please retry");
+            }
+            succeeded.set(true);
+         });
+         lockCoordinator.start();
+
+         Wait.assertTrue(succeeded::get, 5000, 100);
+         Wait.assertEquals(1, lockHolderCount::get);
+      } finally {
+         lockCoordinator.stop();
+      }
+   }
+
+   private void testRetryAfterErrorWithDelayAdd(LockCoordinator 
lockCoordinator) throws Exception {
+      lockHolderCount.set(0);
+      lockChanged.set(0);
+
+      AtomicBoolean succeeded = new AtomicBoolean(false);
+      AtomicInteger numberOfTries = new AtomicInteger(0);
+      try {
+         lockCoordinator.start();
+         Wait.assertEquals(1, lockHolderCount::get);
+
+         lockCoordinator.onLockAcquired(() -> {
+            if (numberOfTries.incrementAndGet() < 5) {
+               throw new RuntimeException("please retry");
+            }
+            succeeded.set(true);
+         });
+
+         Wait.assertTrue(succeeded::get, 5000, 100);
+         Wait.assertEquals(1, lockHolderCount::get);
+      } finally {
+         lockCoordinator.stop();
+      }
+   }
+
+   // validate that no retry would happen since the lock wasn't held in the 
secondLock
+   private void testNoRetryWhileNotAcquired(LockCoordinator firstLock, 
LockCoordinator secondLock) throws Exception {
+      lockHolderCount.set(0);
+      lockChanged.set(0);
+      AtomicBoolean throwError = new AtomicBoolean(true);
+      AtomicBoolean errorHappened = new AtomicBoolean(false);
+
+      AtomicBoolean succeeded = new AtomicBoolean(false);
+      try {
+         firstLock.start();
+         Wait.assertEquals(1, lockHolderCount::get);
+         assertTrue(firstLock.isLocked());
+         secondLock.start();
+         assertFalse(secondLock.isLocked());
+
+         secondLock.onLockAcquired(() -> {
+            if (throwError.get()) {
+               errorHappened.set(true);
+               throw new RuntimeException("please retry");
+            }
+            succeeded.set(true);
+         });
+
+         assertFalse(succeeded.get());
+         assertFalse(errorHappened.get());
+         firstLock.stop();
+         Wait.assertTrue(errorHappened::get, 5000, 100);
+         throwError.set(false);
+         Wait.assertTrue(succeeded::get, 5000, 100);
+         Wait.assertEquals(1, lockHolderCount::get);
+      } finally {
+         firstLock.stop();
+         secondLock.stop();
+      }
+   }
+
+
+   private void testOnlyOneLockHolderAtATime(List<LockCoordinator> 
lockCoordinators) throws Exception {
+      try {
+
+         lockCoordinators.forEach(LockCoordinator::start);
+
+         Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+         long value = RandomUtil.randomPositiveLong();
+
+         boolean first = true;
+
+         for (LockCoordinator lockCoordinator : lockCoordinators) {
+            MutableLong mutableLong = 
lockCoordinator.getLockManager().getMutableLong("mutableLong");
+            if (first) {
+               mutableLong.set(value);
+               first = false;
+            } else {
+               assertEquals(value, mutableLong.get());
+            }
+            mutableLong.close();
+         }
+
+         logger.info("Stopping 
********************************************************************************");
+
+         // We keep stopping lockManager that is holding the lock
+         // we do this until we stop every one of the locks
+         while (!lockCoordinators.isEmpty()) {
+            if (!Wait.waitFor(() -> lockHolderCount.get() == 1, 15000, 100)) {
+               for (LockCoordinator lock : lockCoordinators) {
+                  logger.info("lock {} is holdingLock={}", 
lock.getDebugInfo(), lock.isLocked());
+               }
+            }
+            Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+            for (LockCoordinator lock : lockCoordinators) {
+               if (lock.isLocked()) {
+                  long changed = lockChanged.get();
+                  lock.stop();
+                  lockCoordinators.remove(lock);
+                  //Wait.assertTrue(() -> lockChanged.get() != changed, 5000, 
100);
+                  break;
+               }
+            }
+         }
+
+         // Verify that no locks are held after stopping
+         Wait.assertEquals(0, () -> lockHolderCount.get(), 15000, 100);
+      } finally {
+         try {
+            lockCoordinators.forEach(LockCoordinator::stop);
+         } catch (Throwable ignored) {
+         }
+      }
+   }
+
+   private List<LockCoordinator> getFileCoordinators(int numberOfCoordinators) 
{
+      File file = new File(getTemporaryDir() + "/lockFolder");
+      file.mkdirs();
+      HashMap<String, String> parameters = new HashMap<>();
+      parameters.put("locks-folder", file.getAbsolutePath());
+      return getLockCoordinators(numberOfCoordinators, 
FileBasedLockManager.class.getName(), parameters);
+   }
+
+   private List<LockCoordinator> getZKCoordinators(int numberOfCoordinators, 
String connectString) {
+      HashMap<String, String> parameters = new HashMap<>();
+      parameters.put("connect-string", connectString);
+      return getLockCoordinators(numberOfCoordinators, 
CuratorDistributedLockManager.class.getName(), parameters);
+   }
+
+   private List<LockCoordinator> getLockCoordinators(int numberOfCoordinators, 
String factoryName, HashMap<String, String> parameters) {
+      return getLockCoordinators(numberOfCoordinators, () -> {
+         try {
+            return DistributedLockManager.newInstanceOf(factoryName, 
parameters);
+         } catch (Exception e) {
+            fail(e.getMessage(), e);
+            return null;
+         }
+      });
+   }
+
+   private List<LockCoordinator> getLockCoordinators(int numberOfCoordinators, 
Supplier<DistributedLockManager> lockManagerSupplier) {
+      List<LockCoordinator> locks = new ArrayList<>();
+      String lockName = "lock-test-" + RandomUtil.randomUUIDString();
+      for (int i = 0; i < numberOfCoordinators; i++) {
+         DistributedLockManager lockManager = lockManagerSupplier.get();
+
+         LockCoordinator lockCoordinator = new 
LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), 
KEEP_ALIVE_INTERVAL_MS, lockManager, lockName, lockName);
+         lockCoordinator.onLockAcquired(() -> lock(lockCoordinator));
+         lockCoordinator.onLockReleased(() -> unlock(lockCoordinator));
+         lockCoordinator.onLockReleased(() -> lockChanged.incrementAndGet());
+         lockCoordinator.onLockAcquired(() -> lockChanged.incrementAndGet());
+         lockCoordinator.setDebugInfo("ID" + i);
+         locks.add(lockCoordinator);
+      }
+      return locks;
+   }
+
+   private void lock(LockCoordinator lockCoordinator) {
+      logger.info("++Lock {} lock", lockCoordinator.getDebugInfo());
+      lockHolderCount.incrementAndGet();
+   }
+
+   private void unlock(LockCoordinator lockCoordinator) {
+      logger.info("--Lock {} unlocking", lockCoordinator.getDebugInfo());
+      lockHolderCount.decrementAndGet();
+   }
+
+}
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
new file mode 100644
index 0000000000..2634b06060
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingZooKeeperServer;
+
+/**
+ * This is encapsulating Zookeeper instances for tests
+ * */
+public class ZookeeperCluster {
+   private TestingCluster testingServer;
+   private InstanceSpec[] clusterSpecs;
+   private int nodes;
+   private final File root;
+
+   public ZookeeperCluster(File root, int nodes, int basePort, int 
serverTickMS) throws IOException {
+      this.root = root;
+      this.nodes = nodes;
+      clusterSpecs = new InstanceSpec[nodes];
+      for (int i = 0; i < nodes; i++) {
+         clusterSpecs[i] = new InstanceSpec(newFolder(root, "node" + i), 
basePort + i, -1, -1, true, -1, serverTickMS, -1);
+      }
+      testingServer = new TestingCluster(clusterSpecs);
+   }
+
+   public void start() throws Exception {
+      testingServer.start();
+   }
+
+   public void stop() throws Exception {
+      ThreadLeakCheckExtension.addKownThread("ListenerHandler-");
+      testingServer.stop();
+   }
+
+   private static File newFolder(File root, String... subDirs) throws 
IOException {
+      String subFolder = String.join("/", subDirs);
+      File result = new File(root, subFolder);
+      if (!result.mkdirs()) {
+         throw new IOException("Couldn't create folders " + root);
+      }
+      return result;
+   }
+
+   public String getConnectString() {
+      return testingServer.getConnectString();
+   }
+
+   public List<TestingZooKeeperServer> getServers() {
+      return testingServer.getServers();
+   }
+
+   public int getNodes() {
+      return nodes;
+   }
+}
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
index 634c6a80f0..1fea82cda4 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
@@ -22,10 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension;
 import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -33,8 +30,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 
 //Parameters set in super class
@@ -46,34 +41,27 @@ public class ZookeeperLockManagerSinglePairTest extends 
LockManagerSinglePairTes
    // Beware: the server tick must be small enough that to let the session to 
be correctly expired
    private static final int SERVER_TICK_MS = 100;
 
-   private TestingCluster testingServer;
-   private InstanceSpec[] clusterSpecs;
-   private int nodes;
+   ZookeeperCluster zookeeperCluster;
 
    @BeforeEach
    @Override
    public void setup() throws Exception {
       super.setup();
-      nodes = 3;
-      clusterSpecs = new InstanceSpec[nodes];
-      for (int i = 0; i < nodes; i++) {
-         clusterSpecs[i] = new InstanceSpec(newFolder(temporaryFolder, "node" 
+ i), BASE_SERVER_PORT + i, -1, -1, true, -1, SERVER_TICK_MS, -1);
-      }
-      testingServer = new TestingCluster(clusterSpecs);
-      testingServer.start();
-      assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668", 
testingServer.getConnectString());
-      logger.info("Cluster of {} nodes on: {}", 3, 
testingServer.getConnectString());
+
+      zookeeperCluster = new ZookeeperCluster(temporaryFolder, 3, 
BASE_SERVER_PORT, SERVER_TICK_MS);
+      zookeeperCluster.start();
+      assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668", 
zookeeperCluster.getConnectString());
+      logger.info("Cluster of {} nodes on: {}", 3, 
zookeeperCluster.getConnectString());
    }
 
    @Override
    @AfterEach
    public void after() throws Exception {
       // zk bits that leak from servers
-      ThreadLeakCheckExtension.addKownThread("ListenerHandler-");
       try {
          super.after();
       } finally {
-         testingServer.close();
+         zookeeperCluster.stop();
       }
    }
 
@@ -88,8 +76,8 @@ public class ZookeeperLockManagerSinglePairTest extends 
LockManagerSinglePairTes
 
    @Override
    protected int[] stopMajority() throws Exception {
-      List<TestingZooKeeperServer> followers = testingServer.getServers();
-      final int quorum = (nodes / 2) + 1;
+      List<TestingZooKeeperServer> followers = zookeeperCluster.getServers();
+      final int quorum = (zookeeperCluster.getNodes() / 2) + 1;
       final int[] stopped = new int[quorum];
       for (int i = 0; i < quorum; i++) {
          followers.get(i).stop();
@@ -100,18 +88,9 @@ public class ZookeeperLockManagerSinglePairTest extends 
LockManagerSinglePairTes
 
    @Override
    protected void restart(int[] nodes) throws Exception {
-      List<TestingZooKeeperServer> servers = testingServer.getServers();
+      List<TestingZooKeeperServer> servers = zookeeperCluster.getServers();
       for (int nodeIndex : nodes) {
          servers.get(nodeIndex).restart();
       }
    }
-
-   private static File newFolder(File root, String... subDirs) throws 
IOException {
-      String subFolder = String.join("/", subDirs);
-      File result = new File(root, subFolder);
-      if (!result.mkdirs()) {
-         throw new IOException("Couldn't create folders " + root);
-      }
-      return result;
-   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to