This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4cfaadc113 ARTEMIS-5852 Lock coordination for acceptors
4cfaadc113 is described below
commit 4cfaadc11375cfc02eccd0e8f5b975ac00c81a5c
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jan 29 12:15:21 2026 -0500
ARTEMIS-5852 Lock coordination for acceptors
I'm adding a LockCoordinator to the broker, that will use DistributedLock
to help start and stop acceptors.
You can associate the LockCoordinator with acceptors and an acceptor
serving only clients would then be activated in only one of the brokers.
---
.../artemis/api/core/TransportConfiguration.java | 11 +
.../lockmanager/file/FileBasedLockManager.java | 11 +-
.../zookeeper/CuratorDistributedLockManager.java | 14 +
.../artemis/core/config/Configuration.java | 5 +
.../core/config/LockCoordinatorConfiguration.java | 92 ++++++
.../core/config/impl/ConfigurationImpl.java | 13 +
.../deployers/impl/FileConfigurationParser.java | 55 +++-
.../core/remoting/impl/netty/NettyAcceptor.java | 31 ++
.../remoting/server/impl/RemotingServiceImpl.java | 9 +
.../artemis/core/server/ActiveMQServer.java | 3 +
.../artemis/core/server/ActiveMQServerLogger.java | 12 +
.../core/server/impl/ActiveMQServerImpl.java | 42 +++
.../artemis/core/server/lock/LockCoordinator.java | 295 +++++++++++++++++++
.../artemis/spi/core/remoting/Acceptor.java | 9 +
.../resources/schema/artemis-configuration.xsd | 76 +++++
.../core/config/impl/ConfigurationImplTest.java | 76 +++++
.../config/impl/FileConfigurationParserTest.java | 44 +++
docs/user-manual/_book.adoc | 1 +
.../_diagrams/lock-coordination-example.odg | Bin 0 -> 25555 bytes
.../images/lock-coordination-example.png | Bin 0 -> 28759 bytes
docs/user-manual/lock-coordination.adoc | 164 +++++++++++
docs/user-manual/restart-sequence.adoc | 12 +-
.../dualMirrorSingleAcceptor/ZK/A/broker.xml | 205 +++++++++++++
.../dualMirrorSingleAcceptor/ZK/B/broker.xml | 205 +++++++++++++
.../dualMirrorSingleAcceptor/file/A/broker.xml | 186 ++++++++++++
.../dualMirrorSingleAcceptor/file/B/broker.xml | 187 ++++++++++++
.../DualMirrorSingleAcceptorRunningTest.java | 275 +++++++++++++++++
.../smoke/lockmanager/LockCoordinatorTest.java | 325 +++++++++++++++++++++
.../tests/smoke/lockmanager/ZookeeperCluster.java | 77 +++++
.../ZookeeperLockManagerSinglePairTest.java | 41 +--
30 files changed, 2437 insertions(+), 39 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index eb6f25025c..4a85234417 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable {
private String name;
+ private String lockCoordinator;
+
private String factoryClassName = "null";
private Map<String, Object> params;
@@ -413,6 +415,15 @@ public class TransportConfiguration implements
Serializable {
}
}
+ public String getLockCoordinator() {
+ return lockCoordinator;
+ }
+
+ public TransportConfiguration setLockCoordinator(String lockCoordinator) {
+ this.lockCoordinator = lockCoordinator;
+ return this;
+ }
+
private static String replaceWildcardChars(final String str) {
return str.replace('.', '-');
}
diff --git
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
index dcd5b19d88..3329b88bf7 100644
---
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
+++
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
@@ -32,8 +32,15 @@ import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
/**
- * This is an implementation suitable to be used just on unit tests and it
won't attempt to manage nor purge existing
- * stale locks files. It's part of the tests life-cycle to properly set-up and
tear-down the environment.
+ * file-based distributed lock manager.
+ * <p>
+ * This implementation uses the file system to manage distributed locks
+ * <p>
+ * Valid configuration parameters:
+ * <ul>
+ * <li><b>locks-folder</b> (required): Path to the directory where lock
files will be created and managed.
+ * The directory must be created in advance before using this lock
manager.</li>
+ * </ul>
*/
public class FileBasedLockManager implements DistributedLockManager {
diff --git
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
index 6b5fd5f210..e6aa6689ca 100644
---
a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
+++
b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
@@ -44,6 +44,20 @@ import org.apache.curator.utils.DebugUtils;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
+/**
+ * ZooKeeper-based distributed lock manager using Apache Curator.
+ * <p>
+ * Valid configuration parameters:
+ * <ul>
+ * <li><b>connect-string</b> (required): ZooKeeper connection string (e.g.,
"localhost:2181" or "host1:2181,host2:2181,host3:2181")</li>
+ * <li><b>namespace</b> (required): Namespace prefix for all ZooKeeper paths
to isolate lock manager data</li>
+ * <li><b>session-ms</b> (optional, default: 18000): Session timeout in
milliseconds</li>
+ * <li><b>session-percent</b> (optional, default: 33): Percentage of session
timeout to use for lock operations</li>
+ * <li><b>connection-ms</b> (optional, default: 8000): Connection timeout in
milliseconds</li>
+ * <li><b>retries</b> (optional, default: 1): Number of retry attempts for
failed operations</li>
+ * <li><b>retries-ms</b> (optional, default: 1000): Delay in milliseconds
between retry attempts</li>
+ * </ul>
+ */
public class CuratorDistributedLockManager implements DistributedLockManager,
ConnectionStateListener {
enum PrimitiveType {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index dfbc499c08..440abab376 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config;
import java.io.File;
import java.net.URL;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -1368,6 +1369,10 @@ public interface Configuration {
void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin);
+ Collection<LockCoordinatorConfiguration> getLockCoordinatorConfigurations();
+
+ void addLockCoordinatorConfiguration(LockCoordinatorConfiguration
configuration);
+
List<ActiveMQServerBasePlugin> getBrokerPlugins();
List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
new file mode 100644
index 0000000000..1c742e2c6e
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LockCoordinatorConfiguration {
+
+ String name;
+ String lockId;
+ String className;
+ int checkPeriod;
+ Map<String, String> properties;
+
+ public LockCoordinatorConfiguration() {
+ properties = new HashMap<>();
+ }
+
+ public LockCoordinatorConfiguration(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public LockCoordinatorConfiguration setName(String name) {
+ if (name == null || name.trim().isEmpty()) {
+ throw new IllegalArgumentException("LockCoordinator name cannot be
null or empty");
+ }
+ this.name = name;
+ return this;
+ }
+
+ public String getLockId() {
+ return lockId;
+ }
+
+ public LockCoordinatorConfiguration setLockId(String lockId) {
+ if (lockId == null || lockId.trim().isEmpty()) {
+ throw new IllegalArgumentException("LockCoordinator lockId cannot be
null or empty");
+ }
+ this.lockId = lockId;
+ return this;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public LockCoordinatorConfiguration setClassName(String className) {
+ this.className = className;
+ return this;
+ }
+
+ public int getCheckPeriod() {
+ return checkPeriod;
+ }
+
+ public LockCoordinatorConfiguration setCheckPeriod(int checkPeriod) {
+ if (checkPeriod <= 0) {
+ throw new IllegalArgumentException("LockCoordinator checkPeriod must
be positive, got: " + checkPeriod);
+ }
+ this.checkPeriod = checkPeriod;
+ return this;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return "LockCoordinatorConfiguration{" + "name='" + name + '\'' + ",
lockId='" + lockId + '\'' + ", className='" + className + '\'' + ",
checkPeriod=" + checkPeriod + ", properties=" + properties + '}';
+ }
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index b74f6596df..4bb27eeef8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -88,6 +88,7 @@ import
org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.JaasAppConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@@ -227,6 +228,8 @@ public class ConfigurationImpl extends
javax.security.auth.login.Configuration i
private boolean persistIDCache =
ActiveMQDefaultConfiguration.isDefaultPersistIdCache();
+ private Set<LockCoordinatorConfiguration> lockCoordinatorConfigurations =
new HashSet<>();
+
private List<String> incomingInterceptorClassNames = new ArrayList<>();
private List<String> outgoingInterceptorClassNames = new ArrayList<>();
@@ -3509,6 +3512,16 @@ public class ConfigurationImpl extends
javax.security.auth.login.Configuration i
return this;
}
+ @Override
+ public Set<LockCoordinatorConfiguration> getLockCoordinatorConfigurations()
{
+ return lockCoordinatorConfigurations;
+ }
+
+ @Override
+ public void addLockCoordinatorConfiguration(LockCoordinatorConfiguration
configuration) {
+ lockCoordinatorConfigurations.add(configuration);
+ }
+
// extend property utils with ability to auto-fill and locate from
collections
// collection entries are identified by the name() property
private static class CollectionAutoFillPropertiesUtil extends
PropertyUtilsBean {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index f6ad71f36d..81d48d9ffb 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -51,6 +51,7 @@ import
org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
@@ -97,6 +98,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import
org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.routing.KeyType;
@@ -737,6 +739,22 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
}
}
+ NodeList lockCoordinators = e.getElementsByTagName("lock-coordinators");
+
+ if (lockCoordinators != null) {
+ for (int i = 0; i < lockCoordinators.getLength(); i++) {
+ Element lockCoordinatorElement = (Element)
lockCoordinators.item(i);
+
+ for (int j = 0; j <
lockCoordinatorElement.getChildNodes().getLength(); ++j) {
+ Node node = lockCoordinatorElement.getChildNodes().item(j);
+
+ if (node.getNodeName().equalsIgnoreCase("lock-coordinator")) {
+ parseLockCoordinator((Element) node, config);
+ }
+ }
+ }
+ }
+
// Persistence config
config.setLargeMessagesDirectory(getString(e,
"large-messages-directory", config.getLargeMessagesDirectory(),
NOT_NULL_OR_EMPTY));
@@ -916,6 +934,31 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
}
}
+ private void parseLockCoordinator(final Element lockCoordinatorElement,
final Configuration mainConfig) throws Exception {
+ String name = lockCoordinatorElement.getAttribute("name");
+ String lockId = getString(lockCoordinatorElement, "lock-id", name,
NO_CHECK);
+ String className = getString(lockCoordinatorElement, "class-name", null,
NOT_NULL_OR_EMPTY);
+ int checkPeriod = getInteger(lockCoordinatorElement, "check-period",
LockCoordinator.DEFAULT_CHECK_PERIOD, NO_CHECK);
+
+ HashMap<String, String> properties = new HashMap<>();
+
+ if (parameterExists(lockCoordinatorElement, "properties")) {
+ final NodeList propertyNodeList =
lockCoordinatorElement.getElementsByTagName("property");
+ final int propertiesCount = propertyNodeList.getLength();
+ properties = new HashMap<>(propertiesCount);
+ for (int i = 0; i < propertiesCount; i++) {
+ final Element propertyNode = (Element) propertyNodeList.item(i);
+ final String propertyName =
propertyNode.getAttributeNode("key").getValue();
+ final String propertyValue =
propertyNode.getAttributeNode("value").getValue();
+ properties.put(propertyName, propertyValue);
+ }
+ }
+
+ LockCoordinatorConfiguration lockCoordinatorConfiguration = new
LockCoordinatorConfiguration(properties).setName(name).setLockId(lockId).setClassName(className).setCheckPeriod(checkPeriod);
+ mainConfig.addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
+ }
+
+
private void parseJournalRetention(final Element e, final Configuration
config) {
NodeList retention =
e.getElementsByTagName("journal-retention-directory");
@@ -1621,13 +1664,21 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
final
Configuration mainConfig) throws Exception {
Node nameNode = e.getAttributes().getNamedItem("name");
+ String lockCoordinator = e.getAttribute("lock-coordinator");
+
+
String name = nameNode != null ? nameNode.getNodeValue() : null;
String uri = e.getChildNodes().item(0).getNodeValue();
List<TransportConfiguration> configurations =
ConfigurationUtils.parseAcceptorURI(name, uri);
+ TransportConfiguration transportConfiguration = configurations.get(0);
- Map<String, Object> params = configurations.get(0).getParams();
+ Map<String, Object> params = transportConfiguration.getParams();
+
+ if (lockCoordinator != null) {
+ transportConfiguration.setLockCoordinator(lockCoordinator);
+ }
if (mainConfig.isMaskPassword() != null) {
params.put(ActiveMQDefaultConfiguration.getPropMaskPassword(),
mainConfig.isMaskPassword());
@@ -1637,7 +1688,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
}
}
- return configurations.get(0);
+ return transportConfiguration;
}
private TransportConfiguration parseConnectorTransportConfiguration(final
Element e,
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 1a093ea558..b15b0afb7e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -85,6 +85,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
@@ -125,6 +126,8 @@ public class NettyAcceptor extends AbstractAcceptor {
}
}
+ LockCoordinator lockCoordinator;
+
//just for debug
private final String protocolsString;
@@ -441,6 +444,17 @@ public class NettyAcceptor extends AbstractAcceptor {
}
}
+ @Override
+ public LockCoordinator getLockCoordinator() {
+ return lockCoordinator;
+ }
+
+ @Override
+ public NettyAcceptor setLockCoordinator(LockCoordinator lockCoordinator) {
+ this.lockCoordinator = lockCoordinator;
+ return this;
+ }
+
public int getTcpReceiveBufferSize() {
return tcpReceiveBufferSize;
}
@@ -451,6 +465,15 @@ public class NettyAcceptor extends AbstractAcceptor {
@Override
public synchronized void start() throws Exception {
+ if (lockCoordinator == null) {
+ internalStart();
+ } else {
+ lockCoordinator.onLockAcquired(this::internalStart);
+ lockCoordinator.onLockReleased(this::internalStop);
+ }
+ }
+
+ private void internalStart() throws Exception {
if (channelClazz != null) {
// Already started
return;
@@ -770,6 +793,14 @@ public class NettyAcceptor extends AbstractAcceptor {
@Override
public void stop() throws Exception {
+ if (lockCoordinator != null) {
+ lockCoordinator.stop();
+ } else {
+ internalStop();
+ }
+ }
+
+ private void internalStop() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
asyncStop(latch::countDown);
latch.await();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index c4ec118c1c..932a5fb256 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -71,6 +71,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.logs.AuditLogger;
@@ -284,6 +285,14 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
}
acceptor = factory.createAcceptor(info.getName(), clusterConnection,
info.getParams(), new DelegatingBufferHandler(), this, threadPool,
scheduledThreadPool, selectedProtocols, server.getThreadGroupName("remoting-" +
info.getName()), server.getMetricsManager());
+ if (info.getLockCoordinator() != null) {
+ LockCoordinator lockCoordinator =
server.getLockCoordinator(info.getLockCoordinator());
+ if (lockCoordinator == null) {
+
ActiveMQServerLogger.LOGGER.lockCoordinatorNotFoundOnAcceptor(info.getLockCoordinator(),
acceptor.getName());
+ } else {
+ acceptor.setLockCoordinator(lockCoordinator);
+ }
+ }
if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable())
{
acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d6b0662a7b..3158483591 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -56,6 +56,7 @@ import
org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
@@ -177,6 +178,8 @@ public interface ActiveMQServer extends ServiceComponent {
CriticalAnalyzer getCriticalAnalyzer();
+ LockCoordinator getLockCoordinator(String name);
+
void updateStatus(String component, String statusJson);
/**
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index f4ed4d905d..ec019c1ace 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1518,4 +1518,16 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224153, value = "Unable to find page {} on Address {}
while reloading ACKNOWLEDGE_CURSOR, deleting record {}.", level =
LogMessage.Level.INFO)
void cannotFindPageFileDuringPageAckReload(long pageNr, Object address,
long id);
+
+ @LogMessage(id = 224154, value = "Invalid className configured on
LockCoordinator {}, {} does not exist", level = LogMessage.Level.WARN)
+ void invalidTypeLockCoordinator(String name, String type, Throwable t);
+
+ @LogMessage(id = 224155, value = "LockCoordinator {} not found on acceptor
{}", level = LogMessage.Level.WARN)
+ void lockCoordinatorNotFoundOnAcceptor(String lockName, String
acceptorName);
+
+ @LogMessage(id = 224156, value = "LockCoordinator {} starting with
className={} and lockID={} with checkPeriod={} milliseconds", level =
LogMessage.Level.INFO)
+ void lockCoordinatorStarting(String lockName, String className, String
lockID, int checkPeriod);
+
+ @LogMessage(id = 224157, value = "At least one of the components failed to
start under the lockCoordinator {}. A retry will be executed", level =
LogMessage.Level.INFO)
+ void retryLockCoordinator(String name);
}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 0d43413323..336b81f43a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -73,6 +73,7 @@ import
org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationBrokerPlugin;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.LegacyJMSConfiguration;
@@ -161,6 +162,7 @@ import
org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import
org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import
org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
@@ -195,6 +197,7 @@ import
org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
import org.apache.activemq.artemis.core.version.Version;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -291,6 +294,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private ReplayManager replayManager;
+ private ConcurrentHashMap<String, LockCoordinator> lockCoordinators = new
ConcurrentHashMap<>();
+
/**
* Certain management operations shouldn't use more than one thread. this
semaphore is used to guarantee a single
* thread used.
@@ -552,6 +557,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return this.rebuildCounters;
}
+ @Override
+ public LockCoordinator getLockCoordinator(String name) {
+ return lockCoordinators.get(name);
+ }
@Override
public void replay(Date start, Date end, String address, String target,
String filter) throws Exception {
@@ -735,6 +744,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
ActiveMQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ?
"Backup" : "Primary"), configuration);
+ startLockCoordinators();
+
final boolean wasPrimary = !haPolicy.isBackup();
if (!haPolicy.isBackup()) {
activation = haPolicy.createActivation(this, false,
activationParams, ioCriticalErrorListener);
@@ -793,6 +804,35 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
+ private void startLockCoordinators() {
+ for (LockCoordinatorConfiguration lockCoordinatorConfiguration :
configuration.getLockCoordinatorConfigurations()) {
+ String className = lockCoordinatorConfiguration.getClassName();
+ String name = lockCoordinatorConfiguration.getName();
+ String lockId = lockCoordinatorConfiguration.getLockId();
+ int checkPeriod = lockCoordinatorConfiguration.getCheckPeriod();
+
+ DistributedLockManager lockManager;
+
+ try {
+ lockManager = DistributedLockManager.newInstanceOf(className,
lockCoordinatorConfiguration.getProperties());
+ } catch (Exception e) {
+
ActiveMQServerLogger.LOGGER.invalidTypeLockCoordinator(lockCoordinatorConfiguration.getName(),
className, e);
+ continue;
+ }
+
+ LockCoordinator lockCoordinator = new LockCoordinator(scheduledPool,
executorFactory.getExecutor(), checkPeriod, lockManager, lockId, name);
+ lockCoordinators.put(name, lockCoordinator);
+ ActiveMQServerLogger.LOGGER.lockCoordinatorStarting(name, className,
lockId, checkPeriod);
+ lockCoordinator.start();
+ }
+ }
+
+ private void stopLockCoordinators() {
+ if (lockCoordinators != null) {
+ lockCoordinators.values().forEach(LockCoordinator::stop);
+ lockCoordinators.clear();
+ }
+ }
private void takingLongToStart(Object criticalComponent) {
ActiveMQServerLogger.LOGGER.tooLongToStart(criticalComponent);
@@ -1381,6 +1421,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
t);
}
+ stopLockCoordinators();
+
stopComponent(pagingManager);
if (!criticalIOError && pagingManager != null) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
new file mode 100644
index 0000000000..b614cb6ce8
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.lock;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
+import org.apache.activemq.artemis.utils.RunnableEx;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages distributed locks a pluggable distributed lock mechanism.
+ * <p>
+ * The LockMonitor periodically attempts to acquire a distributed lock. When
the lock
+ * is acquired, registered "acquired" callbacks are executed. When the lock is
lost
+ * or released, "released" callbacks are executed.
+ *
+ * @see org.apache.activemq.artemis.lockmanager.DistributedLockManager
+ */
+public class LockCoordinator extends ActiveMQScheduledComponent {
+
+ /** Default period (in milliseconds) for checking lock status */
+ public static final int DEFAULT_CHECK_PERIOD = 5000;
+
+ String debugInfo;
+
+ public String getDebugInfo() {
+ return debugInfo;
+ }
+
+ public LockCoordinator setDebugInfo(String debugInfo) {
+ this.debugInfo = debugInfo;
+ return this;
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final ArrayList<RunnableEx> lockAcquiredCallback = new
ArrayList<>();
+ private final ArrayList<RunnableEx> lockReleasedCallback = new
ArrayList<>();
+ private final long checkPeriod;
+ private final String name;
+ private final String lockID;
+
+ DistributedLockManager lockManager;
+ DistributedLock distributedLock;
+ volatile boolean locked;
+
+ public DistributedLockManager getLockManager() {
+ return lockManager;
+ }
+
+ /**
+ * Registers a callback to be executed when lock is acquired.
+ * If the lock is already held when this method is called, the callback
+ * will be executed immediately (on the executor thread).
+ *
+ * Also In case the runnable throws any exceptions, the lock will be
released, any previously added callback will be called for stop
+ * and the monitor will retry the locks
+ *
+ * @param runnable the callback to execute when lock is acquired
+ */
+ public void onLockAcquired(RunnableEx runnable) {
+ this.lockAcquiredCallback.add(runnable);
+ // if it's locked we run the runnable being added,
+ // however we must check this inside the executor
+ // or within a global locking
+ executor.execute(() -> runIfLocked(runnable));
+ }
+
+ /**
+ * Registers a callback to be executed when lock is released or lost.
+ *
+ * @param runnable the callback to execute when lock is released
+ */
+ public void onLockReleased(RunnableEx runnable) {
+ this.lockReleasedCallback.add(runnable);
+ }
+
+ /**
+ * Stops the lock coordinator, releasing any held locks and cleaning up
resources.
+ * This method blocks until all cleanup is complete.
+ */
+ @Override
+ public void stop() {
+ super.stop();
+ SimpleFutureImpl<Void> simpleFuture = new SimpleFutureImpl<>();
+ executor.execute(() -> {
+ if (locked) {
+ fireLockChanged(false);
+ }
+ if (distributedLock != null) {
+ try {
+ distributedLock.unlock();
+ } catch (Exception e) {
+ logger.debug("Error unlocking during stop", e);
+ }
+ try {
+ distributedLock.close();
+ } catch (Exception e) {
+ logger.debug("Error closing lock during stop", e);
+ }
+ distributedLock = null;
+ }
+ if (lockManager != null) {
+ try {
+ lockManager.stop();
+ } catch (Exception e) {
+ logger.debug("Error stopping lock manager during stop", e);
+ }
+ lockManager = null;
+ }
+ simpleFuture.set(null);
+ });
+ try {
+ simpleFuture.get();
+ } catch (Exception e) {
+ logger.debug("Error waiting for stop to complete", e);
+ }
+ }
+
+ /**
+ * Returns whether this instance currently holds the lock.
+ *
+ * @return true if lock is currently held, false otherwise
+ */
+ public boolean isLocked() {
+ return locked;
+ }
+
+ /**
+ * Constructs a new LockCoordinator.
+ *
+ * @param scheduledExecutor the executor for scheduling periodic lock checks
+ * @param executor the executor for running callbacks
+ * @param checkPeriod how often to check lock status (in milliseconds)
+ * @param lockManager the distributed lock manager implementation to use
+ * @param lockID the unique identifier for the lock
+ * @param name a descriptive name for this lock coordinator
+ */
+ public LockCoordinator(ScheduledExecutorService scheduledExecutor, Executor
executor, long checkPeriod, DistributedLockManager lockManager, String lockID,
String name) {
+ super(scheduledExecutor, executor, checkPeriod, checkPeriod,
TimeUnit.MILLISECONDS, false);
+ assert executor != null;
+ this.lockManager = lockManager;
+ this.checkPeriod = checkPeriod;
+ this.lockID = lockID;
+ this.name = name;
+ }
+
+ private void fireLockChanged(boolean locked) {
+ this.locked = locked;
+ if (locked) {
+ AtomicBoolean treatErrors = new AtomicBoolean(false);
+ lockAcquiredCallback.forEach(r -> doRunTreatingErrors(r,
treatErrors));
+ if (treatErrors.get()) {
+ retryLock();
+ }
+ } else {
+ lockReleasedCallback.forEach(this::doRunWithLogException);
+ }
+ }
+
+ private void retryLock() {
+ ActiveMQServerLogger.LOGGER.retryLockCoordinator(name);
+ // Release lock and retry on next scheduled run if callbacks failed
+ executor.execute(this::executeRetryLock);
+ }
+
+ // to be used as a runnable on the executor
+ private void executeRetryLock() {
+ if (locked) {
+ logger.debug("Unlocking to retry the callback");
+ fireLockChanged(false);
+ if (distributedLock != null) {
+ try {
+ distributedLock.unlock();
+ distributedLock.close();
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ distributedLock = null;
+ }
+ if (lockManager != null) {
+ try {
+ lockManager.stop();
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private void runIfLocked(RunnableEx checkBeingAdded) {
+ if (locked) {
+ try {
+ doRun(checkBeingAdded);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ retryLock();
+ }
+ }
+ }
+
+ private void doRunTreatingErrors(RunnableEx r, AtomicBoolean errorOnStart) {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errorOnStart.set(true);
+ }
+ }
+
+ private void doRun(RunnableEx r) throws Exception {
+ r.run();
+ }
+
+ private void doRunWithLogException(RunnableEx r) {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!locked) {
+ if (!lockManager.isStarted()) {
+ lockManager.start();
+ }
+ DistributedLock lock = lockManager.getDistributedLock(lockID);
+ if (lock.tryLock(1, TimeUnit.SECONDS)) {
+ logger.debug("Succeeded on locking {}, lockID={}", name,
lockID);
+ this.distributedLock = lock;
+ fireLockChanged(true);
+ } else {
+ logger.debug("Not able to lock {}, lockID={}", name, lockID);
+ lock.close();
+ lockManager.stop();
+ }
+ } else {
+ if (!distributedLock.isHeldByCaller()) {
+ fireLockChanged(false);
+ distributedLock.close();
+ distributedLock = null;
+ lockManager.stop();
+ }
+ }
+ } catch (Exception e) {
+ fireLockChanged(false);
+ if (distributedLock != null) {
+ try {
+ distributedLock.close();
+ } catch (Exception closeEx) {
+ logger.debug("Error closing lock", closeEx);
+ }
+ distributedLock = null;
+ }
+ if (lockManager != null) {
+ try {
+ lockManager.stop();
+ } catch (Exception stopEx) {
+ logger.debug("Error stopping lock manager", stopEx);
+ }
+ }
+ logger.warn(e.getMessage(), e);
+ }
+ }
+}
+
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
index 5914c8c59e..baa6fd7b22 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
@@ -24,6 +24,7 @@ import
org.apache.activemq.artemis.core.protocol.ProtocolHandler;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_AUTO_START;
@@ -39,6 +40,14 @@ public interface Acceptor extends ActiveMQComponent {
*/
String getName();
+ default Acceptor setLockCoordinator(LockCoordinator lockCoordinator) {
+ return this;
+ }
+
+ default LockCoordinator getLockCoordinator() {
+ return null;
+ }
+
/**
* Pause the acceptor and stop it from receiving client requests.
*/
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0cf7e94fa3..5c12855f29 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -551,6 +551,8 @@
</xsd:complexType>
</xsd:element>
+ <xsd:element ref="lock-coordinators" maxOccurs="1" minOccurs="0"/>
+
<xsd:element ref="bridges" maxOccurs="1" minOccurs="0"/>
<xsd:element ref="federations" maxOccurs="1" minOccurs="0"/>
@@ -3224,6 +3226,78 @@
</xsd:complexType>
</xsd:element>
+
+ <xsd:complexType name="lock-coordinatorType">
+ <xsd:all>
+ <xsd:element name="lock-id" type="xsd:string" maxOccurs="1"
minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ The lockID the implementation will use to reach the lock
provider.
+ This is different from the lock-coordinator name, but if
lock-id is omitted, we will use name of the lock-coordinator as a value.
+ Notice this feature is in tech preview and its configuration
is subjected to changes.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="class-name" type="xsd:string" maxOccurs="1"
minOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ The className of lockManager being used for coordination.
+ Notice this is a pluggable functionality so a provider may
introduce additional options.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="check-period" type="xsd:integer" maxOccurs="1"
minOccurs="0" default="5000">
+ <xsd:annotation>
+ <xsd:documentation>
+ A period used to verify if the lock still valid and renew it
if needed.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="properties" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ A list of options for the distributed-primitive-manager
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="property" type="propertyType"
minOccurs="1" maxOccurs="unbounded">
+ <xsd:annotation>
+ <xsd:documentation>
+ A key-value pair option for the
distributed-primitive-manager
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:all>
+
+ <xsd:attribute name="name" type="xsd:ID" use="required">
+ <xsd:annotation>
+ <xsd:documentation>
+ unique name for the lock coordinator
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+
+ </xsd:complexType>
+
+ <xsd:element name="lock-coordinators">
+ <xsd:annotation>
+ <xsd:documentation>
+ a list of lock coordinators
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="lock-coordinator" type="lock-coordinatorType"
maxOccurs="unbounded" minOccurs="0"/>
+ </xsd:sequence>
+ <xsd:attributeGroup ref="xml:specialAttrs"/>
+ </xsd:complexType>
+ </xsd:element>
+
+
<xsd:complexType name="distributed-primitive-manager">
<xsd:all>
<xsd:element name="class-name" type="xsd:string" maxOccurs="1"
minOccurs="0">
@@ -4901,6 +4975,8 @@
<xsd:extension base="xsd:string">
<xsd:attribute name="name" type="xsd:string">
</xsd:attribute>
+ <xsd:attribute name="lock-coordinator" type="xsd:string">
+ </xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 8987868d02..7bf060f156 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.beans.PropertyDescriptor;
+import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.io.StringReader;
@@ -62,6 +64,7 @@ import
org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeAddressPolicyElement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeBrokerConnectionElement;
@@ -3044,6 +3047,79 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
assertDoesNotThrow(() -> configuration.exportAsProperties(fileOutput));
}
+ /**
+ * Verifies the lock coordinator configuration parsing and export process:
+ * <ul>
+ * <li>Creates a configuration from broker properties</li>
+ * <li>Validates the configuration output</li>
+ * <li>Exports the configuration back to a new {@link
java.util.Properties}</li>
+ * <li>Verifies that the new output contains all initially specified
properties</li>
+ * </ul>
+ */
+ @Test
+ public void testParseLockCoordinator() throws Exception {
+ Properties properties = new Properties();
+
+ properties.put("lockCoordinatorConfigurations.hello.checkPeriod", "123");
+ properties.put("lockCoordinatorConfigurations.hello.lockId", "lock-id");
+ properties.put("lockCoordinatorConfigurations.hello.name", "hello");
+ properties.put("lockCoordinatorConfigurations.hello.className",
"some.class.somewhere");
+ for (int i = 0; i < 10; i++) {
+ properties.put("lockCoordinatorConfigurations.hello.properties.k" +
i, "v" + i);
+ }
+
+ properties.put("acceptorConfigurations.netty.factoryClassName", "netty");
+ properties.put("acceptorConfigurations.netty.lockCoordinator", "hello");
+ properties.put("acceptorConfigurations.netty.name", "netty");
+ properties.put("acceptorConfigurations.netty.params.port", "8888");
+ properties.put("acceptorConfigurations.netty.params.host", "localhost");
+
+ ConfigurationImpl configuration = new ConfigurationImpl();
+ configuration.parsePrefixedProperties(properties, null);
+
+ assertEquals(1, configuration.getAcceptorConfigurations().size());
+ TransportConfiguration acceptorConfig = null;
+ for (TransportConfiguration t
:configuration.getAcceptorConfigurations()) {
+ acceptorConfig = t;
+ }
+ // I am not going to validate all the parameters from netty since this
is already tested elsewhere
+ assertEquals("hello", acceptorConfig.getLockCoordinator());
+ assertEquals("netty", acceptorConfig.getFactoryClassName());
+
+ assertEquals(1, configuration.getLockCoordinatorConfigurations().size());
+
+ LockCoordinatorConfiguration lockCoordinatorConfiguration = null;
+
+ for (LockCoordinatorConfiguration t :
configuration.getLockCoordinatorConfigurations()) {
+ lockCoordinatorConfiguration = t;
+ }
+ Map<String, String> lockProperties =
lockCoordinatorConfiguration.getProperties();
+ for (int i = 0; i < 10; i++) {
+ assertEquals("v" + i, lockProperties.get("k" + i));
+ }
+
+ assertEquals(123, lockCoordinatorConfiguration.getCheckPeriod());
+ assertEquals("lock-id", lockCoordinatorConfiguration.getLockId());
+ assertEquals("hello", lockCoordinatorConfiguration.getName());
+ assertEquals("some.class.somewhere",
lockCoordinatorConfiguration.getClassName());
+
+ File outputProperty = new File(getTestDirfile(), "broker.properties");
+ configuration.exportAsProperties(outputProperty);
+
+ Properties brokerProperties = new Properties();
+
+ try (FileInputStream is = new FileInputStream(outputProperty)) {
+ BufferedInputStream bis = new BufferedInputStream(is);
+ brokerProperties.load(bis);
+ }
+
+ properties.forEach((k, v) -> {
+ logger.debug("Validating {} = {}", k, v);
+ assertEquals(v, brokerProperties.get(k));
+ });
+
+ }
+
/**
* To test ARTEMIS-926
*/
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 53b07421d7..fdca23ef83 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -26,7 +26,9 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayInputStream;
import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@@ -55,10 +58,14 @@ import
org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.xml.sax.SAXParseException;
public class FileConfigurationParserTest extends ServerTestBase {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final String PURGE_FOLDER_FALSE = """
<configuration>
<core>
@@ -90,6 +97,7 @@ public class FileConfigurationParserTest extends
ServerTestBase {
<acceptor name="netty">tcp://localhost:5545</acceptor>
<acceptor name="netty-throughput">tcp://localhost:5545</acceptor>
<acceptor name="in-vm">vm://0</acceptor>
+ <acceptor name="netty-with-lock"
lock-coordinator="my-lock">tcp://localhost:5545</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
@@ -130,6 +138,21 @@ public class FileConfigurationParserTest extends
ServerTestBase {
</bridge>
</bridges>""";
+
+ private static final String LOCK_COORDINATOR_PART = """
+ <lock-coordinators>
+ <lock-coordinator name="my-lock">
+ <lock-id>sausage-factory</lock-id>
+ <class-name>some.class.somewhere</class-name>
+ <check-period>333</check-period>
+ <properties>
+ <property key='test1' value='value1'/>
+ <property key='test2' value='value2'/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>""";
+
+
/**
* These "InvalidConfigurationTest*.xml" files are modified copies of
{@literal ConfigurationTest-full-config.xml},
* so just diff it for changes, e.g.
@@ -423,6 +446,27 @@ public class FileConfigurationParserTest extends
ServerTestBase {
assertEquals("helloworld", bconfig.getPassword());
}
+ @Test
+ public void testLockCoordinatorParse() throws Exception {
+ FileConfigurationParser parser = new FileConfigurationParser();
+ String configStr = FIRST_PART + LOCK_COORDINATOR_PART + LAST_PART;
+ Configuration configuration = parser.parseMainConfig(new
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)));
+
+ Collection<LockCoordinatorConfiguration> lockConfigurations =
configuration.getLockCoordinatorConfigurations();
+ lockConfigurations.forEach(f -> logger.info("lockConfiguration={}", f));
+ assertEquals(1, lockConfigurations.size());
+ for (LockCoordinatorConfiguration lockConfiguration :
lockConfigurations) {
+ assertEquals("my-lock", lockConfiguration.getName());
+ assertEquals("sausage-factory", lockConfiguration.getLockId());
+ assertEquals("some.class.somewhere",
lockConfiguration.getClassName());
+ Map<String, String> properties = lockConfiguration.getProperties();
+ assertEquals(2, properties.size());
+ assertEquals("value1", properties.get("test1"));
+ assertEquals("value2", properties.get("test2"));
+ }
+ configuration.getAcceptorConfigurations().stream().filter(f ->
f.getName().equals("netty-with-lock")).forEach(f -> assertEquals("my-lock",
f.getLockCoordinator()));
+ }
+
@Test
public void testDefaultBridgeProducerWindowSize() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
diff --git a/docs/user-manual/_book.adoc b/docs/user-manual/_book.adoc
index d32db5faa8..bf048d4c29 100644
--- a/docs/user-manual/_book.adoc
+++ b/docs/user-manual/_book.adoc
@@ -65,6 +65,7 @@ include::network-isolation.adoc[leveloffset=1]
include::restart-sequence.adoc[leveloffset=1]
include::activation-tools.adoc[leveloffset=1]
include::amqp-broker-connections.adoc[leveloffset=1]
+include::lock-coordination.adoc[leveloffset=1]
include::federation.adoc[leveloffset=1]
include::federation-address.adoc[leveloffset=1]
include::federation-queue.adoc[leveloffset=1]
diff --git a/docs/user-manual/_diagrams/lock-coordination-example.odg
b/docs/user-manual/_diagrams/lock-coordination-example.odg
new file mode 100644
index 0000000000..d82f5b5c72
Binary files /dev/null and
b/docs/user-manual/_diagrams/lock-coordination-example.odg differ
diff --git a/docs/user-manual/images/lock-coordination-example.png
b/docs/user-manual/images/lock-coordination-example.png
new file mode 100644
index 0000000000..752c2909d3
Binary files /dev/null and
b/docs/user-manual/images/lock-coordination-example.png differ
diff --git a/docs/user-manual/lock-coordination.adoc
b/docs/user-manual/lock-coordination.adoc
new file mode 100644
index 0000000000..c5ca2c7466
--- /dev/null
+++ b/docs/user-manual/lock-coordination.adoc
@@ -0,0 +1,164 @@
+= Lock Coordination
+:idprefix:
+:idseparator: -
+:docinfo: shared
+
+The Lock Coordinator provides pluggable distributed lock mechanism monitoring.
+It allows multiple broker instances to coordinate the activation of specific
configuration elements, ensuring that only one broker instance activates a
particular element at any given time.
+
+When a broker acquires a lock through a distributed lock, the associated
configuration elements are activated.
+If the lock is lost or released, those elements are deactivated.
+
+In the current version, the Lock Coordinator can be applied to control the
startup and shutdown of acceptors.
+When an acceptor is associated with a lock coordinator, it will only start
accepting connections when the broker successfully acquires the distributed
lock.
+If lock is lost for any reason, the acceptor automatically stops accepting new
connections.
+
+The same pattern used on acceptors may eventually be applied to other
configuration elements.
+If you have ideas for additional use cases where this pattern could be
applied, please file a JIRA issue.
+
+WARNING: This feature is in technical preview and its configuration elements
are subject to possible modifications.
+
+== Configuration
+
+It is possible to specify multiple lock-coordinators and associate them with
other broker elements.
+
+The broker element associated with a lock-coordinator (e.g., an acceptor) will
only be started if the distributed lock can be acquired.
+If the lock cannot be acquired or is lost, the associated element will be
stopped.
+
+This pattern can be used to ensure clients connect to only one of your
mirrored brokers at a time, preventing split-brain scenarios and duplicate
message processing.
+
+Depending on the provider selector, multiple configuration options can be
provided.
+Please consult the javadoc for your lock implementation.
+A simple table will be provided in this chapter for the two reference
implementations we provide, but this could be a plugin being added to your
broker.
+
+In this next example, we configure a broker with:
+
+* Two acceptors: one for mirroring traffic (`for-mirroring-only`) and one for
client connections (`for-clients-only`)
+* A File-based lock-coordinator named `clients-lock`
+* The client acceptor associated with the lock-coordinator, so it only
activates when the distributed lock is acquired
+* A mirror connection to another broker for data replication
+
+[,xml]
+----
+<acceptors>
+ <acceptor
name="for-mirroring-only">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="for-clients-only"
lock-coordinator="clients-lock">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+</acceptors>
+
+<lock-coordinators>
+ <lock-coordinator name="clients-lock">
+
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+ <lock-id>mirror-cluster-clients</lock-id>
+ <check-period>1000</check-period> <!-- how often to check if the lock is
still valid, in milliseconds -->
+
+ <properties>
+ <property key="locks-folder" value="/usr/somewhere/existing-folder"/>
+ </properties>
+ </lock-coordinator>
+</lock-coordinators>
+
+<broker-connections>
+ <amqp-connection uri="tcp://otherBroker:61000" name="mirror"
retry-interval="2000">
+ <mirror sync="false"/>
+ </amqp-connection>
+</broker-connections>
+
+
+----
+
+In the previous configuration, the broker will use a file lock, and the
acceptor will only be active if it can hold the distributed lock between the
mirrored brokers.
+
+image:images/lock-coordination-example.png[HA with mirroring]
+
+You can find a
https://github.com/apache/artemis-examples/tree/main/examples/features/broker-connection/ha-with-mirroring[working
example] on how to run HA with Mirroring.
+
+== Configuration Options
+
+=== Common Configuration
+
+The following elements are configured on lock-coordinator
+
+[cols="1,1,1,3"]
+|===
+|Element |Required |Default |Description
+
+|name
+|Yes
+|None
+|Unique identifier for this lock-coordinator instance, used to reference it
from other configuration elements
+
+|class-name
+|Yes
+|None
+|The lock provider implementation (e.g.,
`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager` or
`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`)
+
+|lock-id
+|Yes
+|None
+|Unique identifier for the distributed lock. All brokers competing for the
same distributed lock must use the same lock-id
+
+|check-period
+|No
+|5000
+|How often to check if the lock is still valid, in milliseconds
+|===
+
+=== File
+
+The file-based lock uses the file system to manage distributed locks.
+It is provided by the
class-name=`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+
+[cols="1,1,1,3"]
+|===
+|Property |Required |Default |Description
+
+|locks-folder
+|Yes
+|None
+|Path to the directory where lock files will be created and managed. The
directory must be created in advance before using this lock.
+|===
+
+=== ZooKeeper
+
+The ZooKeeper-based lock uses Apache Curator to manage distributed locks via
ZooKeeper.
+It is provided by the
class-name=`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
+
+[cols="1,1,1,3"]
+|===
+|Property |Required |Default |Description
+
+|connect-string
+|Yes
+|None
+|ZooKeeper connection string (e.g., "localhost:2181" or
"host1:2181,host2:2181,host3:2181")
+
+|namespace
+|Yes
+|None
+|Namespace prefix for all ZooKeeper paths to isolate data
+
+|session-ms
+|No
+|18000
+|Session timeout in milliseconds
+
+|session-percent
+|No
+|33
+|Percentage of session timeout to use for lock operations
+
+|connection-ms
+|No
+|8000
+|Connection timeout in milliseconds
+
+|retries
+|No
+|1
+|Number of retry attempts for failed operations
+
+|retries-ms
+|No
+|1000
+|Delay in milliseconds between retry attempts
+|===
diff --git a/docs/user-manual/restart-sequence.adoc
b/docs/user-manual/restart-sequence.adoc
index fa535aee76..dc1f90e8d7 100644
--- a/docs/user-manual/restart-sequence.adoc
+++ b/docs/user-manual/restart-sequence.adoc
@@ -1,11 +1,15 @@
-= Restart Sequence
+= Restart Sequence if using Journal replication
:idprefix:
:idseparator: -
:docinfo: shared
-{project-name-full} ships with 2 architectures for providing HA features.
-The primary and backup brokers can be configured either using network
replication or using shared storage.
-This document will share restart sequences for the brokers under various
circumstances when the client applications are connected to it.
+{project-name-full} ships with 3 possibilities for providing HA:
+
+- Shared storage
+- Network Journal Replication
+- AMQP Broker Connection Mirroring
+
+This page will cover steps to restart the broker while using journal
replication.
== Restarting 1 broker at a time
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
new file mode 100644
index 0000000000..4d3edf8ce8
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
@@ -0,0 +1,205 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="internal">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="artemis"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <properties>
+ <property key="connect-string" value="localhost:2181"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61001" name="mirror"
retry-interval="2000">
+ <mirror sync="false"/>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
new file mode 100644
index 0000000000..918c417197
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
@@ -0,0 +1,205 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="internal">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="artemis"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <properties>
+ <property key="connect-string" value="localhost:2181"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirror"
retry-interval="2000">
+ <mirror sync="false"/>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
new file mode 100644
index 0000000000..f5f50f7836
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
@@ -0,0 +1,186 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+ <acceptor
name="internal">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <!-- acceptor artemis will be created from broker properties -->
+ </acceptors>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61001" name="mirror"
retry-interval="2000">
+ <mirror sync="false"/>
+ </amqp-connection>
+ </broker-connections>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
new file mode 100644
index 0000000000..8b9b054733
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
@@ -0,0 +1,187 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in
memory
+
+ The system will use half of the available memory (-Xmx) by default
for the global-max-size.
+ You may specify a different value here if you need to customize it
to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+ <acceptor
name="internal">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <!-- acceptor artemis will be created from broker properties -->
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirror"
retry-interval="2000">
+ <mirror sync="false"/>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <!-- this should be maxed from the default -->
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
new file mode 100644
index 0000000000..750ad87ebd
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_WITH_ZK_A =
"lockmanager/dualMirrorSingleAcceptor/ZK/A";
+ public static final String SERVER_NAME_WITH_ZK_B =
"lockmanager/dualMirrorSingleAcceptor/ZK/B";
+
+ public static final String SERVER_NAME_WITH_FILE_A =
"lockmanager/dualMirrorSingleAcceptor/file/A";
+ public static final String SERVER_NAME_WITH_FILE_B =
"lockmanager/dualMirrorSingleAcceptor/file/B";
+
+ // Test constants
+ private static final int ALTERNATING_TEST_ITERATIONS = 2;
+ private static final int MESSAGES_SENT_PER_ITERATION = 100;
+ private static final int MESSAGES_CONSUMED_PER_ITERATION = 17;
+ private static final int MESSAGES_REMAINING_PER_ITERATION =
MESSAGES_SENT_PER_ITERATION - MESSAGES_CONSUMED_PER_ITERATION;
+ private static final int EXPECTED_FINAL_MESSAGE_COUNT =
ALTERNATING_TEST_ITERATIONS * MESSAGES_REMAINING_PER_ITERATION;
+
+ private static final int ZK_BASE_PORT = 2181;
+
+ Process processA;
+ Process processB;
+
+ private static void customizeFileServer(File serverLocation, File fileLock)
{
+ try {
+ FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"),
"CHANGEME", fileLock.getAbsolutePath());
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private static void createServerPair(String serverNameA, String serverNameB,
+ String configPathA, String
configPathB,
+ Consumer<File> customizeServer)
throws Exception {
+ File serverLocationA = getFileServerLocation(serverNameA);
+ File serverLocationB = getFileServerLocation(serverNameB);
+ deleteDirectory(serverLocationB);
+ deleteDirectory(serverLocationA);
+
+ createSingleServer(serverLocationA, configPathA, "A", customizeServer);
+ createSingleServer(serverLocationB, configPathB, "B", customizeServer);
+ }
+
+ private static void createSingleServer(File serverLocation, String
configPath,
+ String userAndPassword,
Consumer<File> customizeServer) throws Exception {
+ HelperCreate cliCreateServer = helperCreate();
+ cliCreateServer.setAllowAnonymous(true)
+ .setUser(userAndPassword)
+ .setPassword(userAndPassword)
+ .setNoWeb(true)
+ .setConfiguration(configPath)
+ .setArtemisInstance(serverLocation);
+ cliCreateServer.createServer();
+
+ if (customizeServer != null) {
+ customizeServer.accept(serverLocation);
+ }
+ }
+
+ @BeforeEach
+ public void prepareServers() throws Exception {
+
+ }
+
+ @Test
+ public void testAlternatingZK() throws Throwable {
+ {
+ createServerPair(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B,
+
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A",
+
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B",
+ null);
+
+ cleanupData(SERVER_NAME_WITH_ZK_A);
+ cleanupData(SERVER_NAME_WITH_ZK_B);
+ }
+
+ // starting zookeeper
+ ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1,
ZK_BASE_PORT, 100);
+ zkCluster.start();
+ runAfter(zkCluster::stop);
+
+ testAlternating(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, null,
null);
+ }
+
+ @Test
+ public void testAlternatingFile() throws Throwable {
+ File fileLock = new File("./target/serverLock");
+ fileLock.mkdirs();
+
+ {
+ createServerPair(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B,
+
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A",
+
"./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B",
+ s -> customizeFileServer(s, fileLock));
+
+ cleanupData(SERVER_NAME_WITH_FILE_A);
+ cleanupData(SERVER_NAME_WITH_FILE_B);
+ }
+
+ Properties properties = new Properties();
+
+ properties.put("acceptorConfigurations.artemis.extraParams.amqpCredits",
"1000");
+
properties.put("acceptorConfigurations.artemis.extraParams.amqpLowCredits",
"300");
+ properties.put("acceptorConfigurations.artemis.factoryClassName",
"org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory");
+ properties.put("acceptorConfigurations.artemis.lockCoordinator",
"failover");
+ properties.put("acceptorConfigurations.artemis.name", "artemis");
+ properties.put("acceptorConfigurations.artemis.params.scheme", "tcp");
+
properties.put("acceptorConfigurations.artemis.params.tcpReceiveBufferSize",
"1048576");
+ properties.put("acceptorConfigurations.artemis.params.port", "61616");
+ properties.put("acceptorConfigurations.artemis.params.host",
"localhost");
+ properties.put("acceptorConfigurations.artemis.params.protocols",
"CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE");
+ properties.put("acceptorConfigurations.artemis.params.useEpoll", "true");
+
properties.put("acceptorConfigurations.artemis.params.tcpSendBufferSize",
"1048576");
+
+ properties.put("lockCoordinatorConfigurations.failover.checkPeriod",
"5000");
+ properties.put("lockCoordinatorConfigurations.failover.className",
"org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager");
+ properties.put("lockCoordinatorConfigurations.failover.lockId", "fail");
+ properties.put("lockCoordinatorConfigurations.failover.name",
"failover");
+
properties.put("lockCoordinatorConfigurations.failover.properties.locks-folder",
fileLock.getAbsolutePath());
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(new
File(getServerLocation(SERVER_NAME_WITH_FILE_A), "broker.properties"))) {
+ properties.store(fileOutputStream, null);
+ }
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(new
File(getServerLocation(SERVER_NAME_WITH_FILE_B), "broker.properties"))) {
+ properties.store(fileOutputStream, null);
+ }
+
+ // I'm using broker properties in one of the tests, to help
validating it
+ File propertiesA = new File(getServerLocation(SERVER_NAME_WITH_FILE_A),
"broker.properties");
+ File propertiesB = new File(getServerLocation(SERVER_NAME_WITH_FILE_B),
"broker.properties");
+
+ testAlternating(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B,
propertiesA, propertiesB);
+ }
+
+ public void testAlternating(String nameServerA, String nameServerB, File
brokerPropertiesA, File brokerPropertiesB) throws Throwable {
+ processA = startServer(nameServerA, 0, -1, brokerPropertiesA);
+ waitForXToStart();
+ processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
+ ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp",
"tcp://localhost:61616");
+
+ for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) {
+ logger.info("Iteration {}: Server {} active", i, (i % 2 == 0) ? "A" :
"B");
+
+ if (i % 2 == 0) {
+ // Even iteration: Server A active, kill Server B
+ killServer(processB);
+ waitForXToStart();
+ } else {
+ // Odd iteration: Server B active, kill Server A
+ killServer(processA);
+ waitForXToStart();
+ }
+
+ // Send messages through the shared acceptor
+ cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
+ sendMessages(cfX, MESSAGES_SENT_PER_ITERATION);
+
+ // Consume some messages
+ receiveMessages(cfX, MESSAGES_CONSUMED_PER_ITERATION);
+
+ // Restart the killed server
+ if (i % 2 == 0) {
+ processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
+ } else {
+ processA = startServer(nameServerA, 0, -1, brokerPropertiesA);
+ }
+ }
+
+ // Verify they both have the expected message count (iterations × (sent
- consumed))
+ assertMessageCount("tcp://localhost:61000", "myQueue",
EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount("tcp://localhost:61001", "myQueue",
EXPECTED_FINAL_MESSAGE_COUNT);
+ }
+
+ private static void sendMessages(ConnectionFactory cfX, int nmessages)
throws JMSException {
+ try (Connection connectionX = cfX.createConnection("A", "A")) {
+ Session sessionX = connectionX.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = sessionX.createQueue("myQueue");
+ MessageProducer producerX = sessionX.createProducer(queue);
+ for (int i = 0; i < nmessages; i++) {
+ producerX.send(sessionX.createTextMessage("hello " + i));
+ }
+ sessionX.commit();
+ }
+ }
+
+ private static void receiveMessages(ConnectionFactory cfX, int nmessages)
throws JMSException {
+ try (Connection connectionX = cfX.createConnection("A", "A")) {
+ connectionX.start();
+ Session sessionX = connectionX.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = sessionX.createQueue("myQueue");
+ MessageConsumer consumerX = sessionX.createConsumer(queue);
+ for (int i = 0; i < nmessages; i++) {
+ TextMessage message = (TextMessage) consumerX.receive(5000);
+ assertNotNull(message, "Expected message " + i + " but got null");
+ }
+ sessionX.commit();
+ }
+ }
+
+ private void waitForXToStart() {
+ for (int i = 0; i < 20; i++) {
+ try {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61616");
+ Connection connection = factory.createConnection();
+ connection.close();
+ return;
+ } catch (Throwable e) {
+ logger.debug(e.getMessage(), e);
+ try {
+ Thread.sleep(500);
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ protected void assertMessageCount(String uri, String queueName, int count)
throws Exception {
+ SimpleManagement simpleManagement = new SimpleManagement(uri, null,
null);
+ Wait.assertEquals(count, () -> {
+ try {
+ return simpleManagement.getMessageCountOnQueue(queueName);
+ } catch (Throwable e) {
+ return -1;
+ }
+ });
+ }
+
+}
\ No newline at end of file
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
new file mode 100644
index 0000000000..2af144d8fb
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
+import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager;
+import
org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * This test needs external dependencies. It follows the same pattern
described at {@link DualMirrorSingleAcceptorRunningTest}.
+ * please read the documentation from that test for more detail on how to run
this test.
+ */
+public class LockCoordinatorTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int ZK_BASE_PORT = 2181;
+ private static final String ZK_ENDPOINTS = "127.0.0.1:2181";
+ private static final long KEEP_ALIVE_INTERVAL_MS = 200;
+ private static final int NUM_THREADS = 10;
+
+ private ExecutorService executorService;
+ private ScheduledExecutorService scheduledExecutor;
+ private AtomicInteger lockHolderCount;
+ private AtomicInteger lockChanged;
+ private OrderedExecutorFactory executorFactory;
+
+ @BeforeEach
+ @Override
+ public void setUp() {
+ disableCheckThread();
+ scheduledExecutor = Executors.newScheduledThreadPool(NUM_THREADS);
+ executorService = Executors.newFixedThreadPool(NUM_THREADS * 2);
+ executorFactory = new OrderedExecutorFactory(executorService);
+ lockHolderCount = new AtomicInteger(0);
+ lockChanged = new AtomicInteger(0);
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() {
+ scheduledExecutor.shutdownNow();
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testWithFile() throws Exception {
+ internalTest(i -> getFileCoordinators(i));
+ }
+
+ @Test
+ public void testWithZK() throws Exception {
+ ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1,
ZK_BASE_PORT, 100);
+ zkCluster.start();
+ runAfter(zkCluster::stop);
+ assertEquals(ZK_ENDPOINTS, zkCluster.getConnectString());
+ internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString()));
+ }
+
+ private void internalTest(Function<Integer, List<LockCoordinator>>
lockCoordinatorSupplier) throws Exception {
+ testOnlyOneLockHolderAtATime(lockCoordinatorSupplier.apply(NUM_THREADS));
+ testAddAfterLocked(lockCoordinatorSupplier.apply(1).get(0));
+ testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0));
+ testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0));
+
+ {
+ List<LockCoordinator> list = lockCoordinatorSupplier.apply(2);
+ testNoRetryWhileNotAcquired(list.get(0), list.get(1));
+ }
+ }
+
+ private void testAddAfterLocked(LockCoordinator lockCoordinator) throws
Exception {
+ lockHolderCount.set(0);
+ lockChanged.set(0);
+
+ try {
+ lockCoordinator.start();
+ Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+ AtomicInteger afterRunning = new AtomicInteger(0);
+ assertTrue(lockCoordinator.isLocked());
+ lockCoordinator.onLockAcquired(afterRunning::incrementAndGet);
+
+ Wait.assertEquals(1, afterRunning::get);
+
+ assertEquals(1, lockHolderCount.get());
+ } finally {
+ lockCoordinator.stop();
+ }
+ }
+
+ private void testRetryAfterError(LockCoordinator lockCoordinator) throws
Exception {
+ lockHolderCount.set(0);
+ lockChanged.set(0);
+
+ AtomicBoolean succeeded = new AtomicBoolean(false);
+ AtomicInteger numberOfTries = new AtomicInteger(0);
+ try {
+ lockCoordinator.onLockAcquired(() -> {
+ if (numberOfTries.incrementAndGet() < 5) {
+ throw new IOException("please retry");
+ }
+ succeeded.set(true);
+ });
+ lockCoordinator.start();
+
+ Wait.assertTrue(succeeded::get, 5000, 100);
+ Wait.assertEquals(1, lockHolderCount::get);
+ } finally {
+ lockCoordinator.stop();
+ }
+ }
+
+ private void testRetryAfterErrorWithDelayAdd(LockCoordinator
lockCoordinator) throws Exception {
+ lockHolderCount.set(0);
+ lockChanged.set(0);
+
+ AtomicBoolean succeeded = new AtomicBoolean(false);
+ AtomicInteger numberOfTries = new AtomicInteger(0);
+ try {
+ lockCoordinator.start();
+ Wait.assertEquals(1, lockHolderCount::get);
+
+ lockCoordinator.onLockAcquired(() -> {
+ if (numberOfTries.incrementAndGet() < 5) {
+ throw new RuntimeException("please retry");
+ }
+ succeeded.set(true);
+ });
+
+ Wait.assertTrue(succeeded::get, 5000, 100);
+ Wait.assertEquals(1, lockHolderCount::get);
+ } finally {
+ lockCoordinator.stop();
+ }
+ }
+
+ // validate that no retry would happen since the lock wasn't held in the
secondLock
+ private void testNoRetryWhileNotAcquired(LockCoordinator firstLock,
LockCoordinator secondLock) throws Exception {
+ lockHolderCount.set(0);
+ lockChanged.set(0);
+ AtomicBoolean throwError = new AtomicBoolean(true);
+ AtomicBoolean errorHappened = new AtomicBoolean(false);
+
+ AtomicBoolean succeeded = new AtomicBoolean(false);
+ try {
+ firstLock.start();
+ Wait.assertEquals(1, lockHolderCount::get);
+ assertTrue(firstLock.isLocked());
+ secondLock.start();
+ assertFalse(secondLock.isLocked());
+
+ secondLock.onLockAcquired(() -> {
+ if (throwError.get()) {
+ errorHappened.set(true);
+ throw new RuntimeException("please retry");
+ }
+ succeeded.set(true);
+ });
+
+ assertFalse(succeeded.get());
+ assertFalse(errorHappened.get());
+ firstLock.stop();
+ Wait.assertTrue(errorHappened::get, 5000, 100);
+ throwError.set(false);
+ Wait.assertTrue(succeeded::get, 5000, 100);
+ Wait.assertEquals(1, lockHolderCount::get);
+ } finally {
+ firstLock.stop();
+ secondLock.stop();
+ }
+ }
+
+
+ private void testOnlyOneLockHolderAtATime(List<LockCoordinator>
lockCoordinators) throws Exception {
+ try {
+
+ lockCoordinators.forEach(LockCoordinator::start);
+
+ Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+ long value = RandomUtil.randomPositiveLong();
+
+ boolean first = true;
+
+ for (LockCoordinator lockCoordinator : lockCoordinators) {
+ MutableLong mutableLong =
lockCoordinator.getLockManager().getMutableLong("mutableLong");
+ if (first) {
+ mutableLong.set(value);
+ first = false;
+ } else {
+ assertEquals(value, mutableLong.get());
+ }
+ mutableLong.close();
+ }
+
+ logger.info("Stopping
********************************************************************************");
+
+ // We keep stopping lockManager that is holding the lock
+ // we do this until we stop every one of the locks
+ while (!lockCoordinators.isEmpty()) {
+ if (!Wait.waitFor(() -> lockHolderCount.get() == 1, 15000, 100)) {
+ for (LockCoordinator lock : lockCoordinators) {
+ logger.info("lock {} is holdingLock={}",
lock.getDebugInfo(), lock.isLocked());
+ }
+ }
+ Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+ for (LockCoordinator lock : lockCoordinators) {
+ if (lock.isLocked()) {
+ long changed = lockChanged.get();
+ lock.stop();
+ lockCoordinators.remove(lock);
+ //Wait.assertTrue(() -> lockChanged.get() != changed, 5000,
100);
+ break;
+ }
+ }
+ }
+
+ // Verify that no locks are held after stopping
+ Wait.assertEquals(0, () -> lockHolderCount.get(), 15000, 100);
+ } finally {
+ try {
+ lockCoordinators.forEach(LockCoordinator::stop);
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ private List<LockCoordinator> getFileCoordinators(int numberOfCoordinators)
{
+ File file = new File(getTemporaryDir() + "/lockFolder");
+ file.mkdirs();
+ HashMap<String, String> parameters = new HashMap<>();
+ parameters.put("locks-folder", file.getAbsolutePath());
+ return getLockCoordinators(numberOfCoordinators,
FileBasedLockManager.class.getName(), parameters);
+ }
+
+ private List<LockCoordinator> getZKCoordinators(int numberOfCoordinators,
String connectString) {
+ HashMap<String, String> parameters = new HashMap<>();
+ parameters.put("connect-string", connectString);
+ return getLockCoordinators(numberOfCoordinators,
CuratorDistributedLockManager.class.getName(), parameters);
+ }
+
+ private List<LockCoordinator> getLockCoordinators(int numberOfCoordinators,
String factoryName, HashMap<String, String> parameters) {
+ return getLockCoordinators(numberOfCoordinators, () -> {
+ try {
+ return DistributedLockManager.newInstanceOf(factoryName,
parameters);
+ } catch (Exception e) {
+ fail(e.getMessage(), e);
+ return null;
+ }
+ });
+ }
+
+ private List<LockCoordinator> getLockCoordinators(int numberOfCoordinators,
Supplier<DistributedLockManager> lockManagerSupplier) {
+ List<LockCoordinator> locks = new ArrayList<>();
+ String lockName = "lock-test-" + RandomUtil.randomUUIDString();
+ for (int i = 0; i < numberOfCoordinators; i++) {
+ DistributedLockManager lockManager = lockManagerSupplier.get();
+
+ LockCoordinator lockCoordinator = new
LockCoordinator(scheduledExecutor, executorFactory.getExecutor(),
KEEP_ALIVE_INTERVAL_MS, lockManager, lockName, lockName);
+ lockCoordinator.onLockAcquired(() -> lock(lockCoordinator));
+ lockCoordinator.onLockReleased(() -> unlock(lockCoordinator));
+ lockCoordinator.onLockReleased(() -> lockChanged.incrementAndGet());
+ lockCoordinator.onLockAcquired(() -> lockChanged.incrementAndGet());
+ lockCoordinator.setDebugInfo("ID" + i);
+ locks.add(lockCoordinator);
+ }
+ return locks;
+ }
+
+ private void lock(LockCoordinator lockCoordinator) {
+ logger.info("++Lock {} lock", lockCoordinator.getDebugInfo());
+ lockHolderCount.incrementAndGet();
+ }
+
+ private void unlock(LockCoordinator lockCoordinator) {
+ logger.info("--Lock {} unlocking", lockCoordinator.getDebugInfo());
+ lockHolderCount.decrementAndGet();
+ }
+
+}
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
new file mode 100644
index 0000000000..2634b06060
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingZooKeeperServer;
+
+/**
+ * This is encapsulating Zookeeper instances for tests
+ * */
+public class ZookeeperCluster {
+ private TestingCluster testingServer;
+ private InstanceSpec[] clusterSpecs;
+ private int nodes;
+ private final File root;
+
+ public ZookeeperCluster(File root, int nodes, int basePort, int
serverTickMS) throws IOException {
+ this.root = root;
+ this.nodes = nodes;
+ clusterSpecs = new InstanceSpec[nodes];
+ for (int i = 0; i < nodes; i++) {
+ clusterSpecs[i] = new InstanceSpec(newFolder(root, "node" + i),
basePort + i, -1, -1, true, -1, serverTickMS, -1);
+ }
+ testingServer = new TestingCluster(clusterSpecs);
+ }
+
+ public void start() throws Exception {
+ testingServer.start();
+ }
+
+ public void stop() throws Exception {
+ ThreadLeakCheckExtension.addKownThread("ListenerHandler-");
+ testingServer.stop();
+ }
+
+ private static File newFolder(File root, String... subDirs) throws
IOException {
+ String subFolder = String.join("/", subDirs);
+ File result = new File(root, subFolder);
+ if (!result.mkdirs()) {
+ throw new IOException("Couldn't create folders " + root);
+ }
+ return result;
+ }
+
+ public String getConnectString() {
+ return testingServer.getConnectString();
+ }
+
+ public List<TestingZooKeeperServer> getServers() {
+ return testingServer.getServers();
+ }
+
+ public int getNodes() {
+ return nodes;
+ }
+}
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
index 634c6a80f0..1fea82cda4 100644
---
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java
@@ -22,10 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension;
import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingZooKeeperServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -33,8 +30,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
//Parameters set in super class
@@ -46,34 +41,27 @@ public class ZookeeperLockManagerSinglePairTest extends
LockManagerSinglePairTes
// Beware: the server tick must be small enough that to let the session to
be correctly expired
private static final int SERVER_TICK_MS = 100;
- private TestingCluster testingServer;
- private InstanceSpec[] clusterSpecs;
- private int nodes;
+ ZookeeperCluster zookeeperCluster;
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
- nodes = 3;
- clusterSpecs = new InstanceSpec[nodes];
- for (int i = 0; i < nodes; i++) {
- clusterSpecs[i] = new InstanceSpec(newFolder(temporaryFolder, "node"
+ i), BASE_SERVER_PORT + i, -1, -1, true, -1, SERVER_TICK_MS, -1);
- }
- testingServer = new TestingCluster(clusterSpecs);
- testingServer.start();
- assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668",
testingServer.getConnectString());
- logger.info("Cluster of {} nodes on: {}", 3,
testingServer.getConnectString());
+
+ zookeeperCluster = new ZookeeperCluster(temporaryFolder, 3,
BASE_SERVER_PORT, SERVER_TICK_MS);
+ zookeeperCluster.start();
+ assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668",
zookeeperCluster.getConnectString());
+ logger.info("Cluster of {} nodes on: {}", 3,
zookeeperCluster.getConnectString());
}
@Override
@AfterEach
public void after() throws Exception {
// zk bits that leak from servers
- ThreadLeakCheckExtension.addKownThread("ListenerHandler-");
try {
super.after();
} finally {
- testingServer.close();
+ zookeeperCluster.stop();
}
}
@@ -88,8 +76,8 @@ public class ZookeeperLockManagerSinglePairTest extends
LockManagerSinglePairTes
@Override
protected int[] stopMajority() throws Exception {
- List<TestingZooKeeperServer> followers = testingServer.getServers();
- final int quorum = (nodes / 2) + 1;
+ List<TestingZooKeeperServer> followers = zookeeperCluster.getServers();
+ final int quorum = (zookeeperCluster.getNodes() / 2) + 1;
final int[] stopped = new int[quorum];
for (int i = 0; i < quorum; i++) {
followers.get(i).stop();
@@ -100,18 +88,9 @@ public class ZookeeperLockManagerSinglePairTest extends
LockManagerSinglePairTes
@Override
protected void restart(int[] nodes) throws Exception {
- List<TestingZooKeeperServer> servers = testingServer.getServers();
+ List<TestingZooKeeperServer> servers = zookeeperCluster.getServers();
for (int nodeIndex : nodes) {
servers.get(nodeIndex).restart();
}
}
-
- private static File newFolder(File root, String... subDirs) throws
IOException {
- String subFolder = String.join("/", subDirs);
- File result = new File(root, subFolder);
- if (!result.mkdirs()) {
- throw new IOException("Couldn't create folders " + root);
- }
- return result;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]