This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new bb08a573eb ARTEMIS-3640 Use client connectors for HA bb08a573eb is described below commit bb08a573ebdd414abcebad9223a599360443facf Author: Domenico Francesco Bruscino <brus...@apache.org> AuthorDate: Mon Mar 6 16:44:19 2023 +0100 ARTEMIS-3640 Use client connectors for HA --- .../artemis/api/core/TransportConfiguration.java | 2 + .../core/client/impl/ClientSessionFactoryImpl.java | 36 +++- .../core/client/impl/ServerLocatorImpl.java | 2 +- .../connector/TCPTransportConfigurationSchema.java | 12 +- ...nnectorTransportConfigurationParserURITest.java | 22 +++ .../artemis/tests/util/ActiveMQTestBase.java | 9 +- docs/user-manual/en/ha.md | 41 +++++ .../cluster/distribution/ClusterTestBase.java | 7 +- .../failover/ClientConnectorFailoverTest.java | 192 +++++++++++++++++++++ .../StaticClusterWithBackupFailoverTest.java | 12 ++ 10 files changed, 325 insertions(+), 10 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 5a823a8369..048ea51ffc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -47,6 +47,8 @@ public class TransportConfiguration implements Serializable { private static final long serialVersionUID = -3994528421527392679L; + public static final String NAME_PARAM = "name"; + public static final String EXTRA_PROPERTY_PREFIX = "$.EP."; private String name; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index ac6b70c7f0..1f21d3351b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -82,7 +82,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final ClientProtocolManager clientProtocolManager; - private final TransportConfiguration connectorConfig; + private TransportConfiguration connectorConfig; private TransportConfiguration previousConnectorConfig; @@ -158,6 +158,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final Object connectionReadyLock = new Object(); + private final TransportConfiguration[] connectorConfigs; + public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final TransportConfiguration connectorConfig, final ServerLocatorConfig locatorConfig, @@ -171,6 +173,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C scheduledThreadPool, incomingInterceptors, outgoingInterceptors); } + ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, + final Pair<TransportConfiguration, TransportConfiguration> connectorConfig, + final ServerLocatorConfig locatorConfig, + final int reconnectAttempts, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final List<Interceptor> incomingInterceptors, + final List<Interceptor> outgoingInterceptors) { + this(serverLocator, connectorConfig, + locatorConfig, reconnectAttempts, threadPool, + scheduledThreadPool, incomingInterceptors, outgoingInterceptors, null); + } + ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final Pair<TransportConfiguration, TransportConfiguration> connectorConfig, final ServerLocatorConfig locatorConfig, @@ -178,7 +193,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, final List<Interceptor> incomingInterceptors, - final List<Interceptor> outgoingInterceptors) { + final List<Interceptor> outgoingInterceptors, + final TransportConfiguration[] connectorConfigs) { createTrace = new Exception(); this.serverLocator = serverLocator; @@ -238,6 +254,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (connectorConfig.getB() != null) { this.backupConnectorConfig = connectorConfig.getB(); } + + this.connectorConfigs = connectorConfigs; } @Override @@ -1150,6 +1168,20 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (backupConnectorConfig != null) { + //Try to connect with the client connector that match the backup connector name + String backupConnectorName = backupConnectorConfig.getName(); + if (backupConnectorName != null && connectorConfigs != null) { + for (TransportConfiguration connectorConfig : connectorConfigs) { + if (backupConnectorName.equals(connectorConfig.getName())) { + //Try to connect with the backup connector configuration + transportConnection = createTransportConnection("backup", connectorConfig); + if (transportConnection != null) { + return transportConnection; + } + } + } + } + //Try to connect with the backup connector configuration transportConnection = createTransportConnection("backup", backupConnectorConfig); if (transportConnection != null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 127a3ea0f4..fa6b6b0dfb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -678,7 +678,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // try each factory in the list until we find one which works try { - factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors, initialConnectors); try { addToConnecting(factory); // We always try to connect here with only one attempt, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java index e962a5d211..99c8da1391 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java @@ -65,7 +65,11 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat BeanSupport.setData(uri, props, allowableProperties, query, extraProps); List<TransportConfiguration> transportConfigurations = new ArrayList<>(); - TransportConfiguration config = new TransportConfiguration(factoryName, props, name, extraProps); + String nameFromQuery = query.getOrDefault(TransportConfiguration.NAME_PARAM, name); + if (name != null && !name.equals(nameFromQuery)) { + throw new IllegalArgumentException("Name doesn't match query param"); + } + TransportConfiguration config = new TransportConfiguration(factoryName, props, nameFromQuery, extraProps); transportConfigurations.add(config); String connectors = uri.getFragment(); @@ -77,8 +81,10 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat HashMap<String, Object> newProps = new HashMap<>(); extraProps = new HashMap<>(); BeanSupport.setData(extraUri, newProps, allowableProperties, query, extraProps); - BeanSupport.setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null), extraProps); - transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), extraProps)); + Map<String, String> extraUriQuery = parseQuery(extraUri.getQuery(), null); + BeanSupport.setData(extraUri, newProps, allowableProperties, extraUriQuery, extraProps); + String extraUriNameFromQuery = extraUriQuery.getOrDefault(TransportConfiguration.NAME_PARAM, name + ":" + extraUri); + transportConfigurations.add(new TransportConfiguration(factoryName, newProps, extraUriNameFromQuery, extraProps)); } } return transportConfigurations; diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java index 8d82fbb577..6c8f5bc859 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java @@ -51,4 +51,26 @@ public class ConnectorTransportConfigurationParserURITest { Assert.assertEquals("3", objects.get(2).getParams().get("port")); } + @Test + public void testParseMultipleConnectorWithName() throws Exception { + ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser(false); + + URI transportURI = parser.expandURI("(tcp://live:1?name=live1,tcp://backupA:2?name=backupA2,tcp://backupB:3?name=backupB3"); + System.out.println(transportURI); + List<TransportConfiguration> objects = parser.newObject(transportURI, null); + if (logger.isInfoEnabled()) { + objects.forEach(t -> logger.info("transportConfig: {}", t)); + } + + Assert.assertEquals(3, objects.size()); + Assert.assertEquals("live1", objects.get(0).getName()); + Assert.assertEquals("live", objects.get(0).getParams().get("host")); + Assert.assertEquals("1", objects.get(0).getParams().get("port")); + Assert.assertEquals("backupA2", objects.get(1).getName()); + Assert.assertEquals("backupA", objects.get(1).getParams().get("host")); + Assert.assertEquals("2", objects.get(1).getParams().get("port")); + Assert.assertEquals("backupB3", objects.get(2).getName()); + Assert.assertEquals("backupB", objects.get(2).getParams().get("host")); + Assert.assertEquals("3", objects.get(2).getParams().get("port")); + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 95015ba719..4454d61b71 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1338,6 +1338,13 @@ public abstract class ActiveMQTestBase extends Assert { protected static final TransportConfiguration createTransportConfiguration(boolean netty, boolean acceptor, Map<String, Object> params) { + return createTransportConfiguration(UUIDGenerator.getInstance().generateStringUUID(), netty, acceptor, params); + } + + protected static final TransportConfiguration createTransportConfiguration(String name, + boolean netty, + boolean acceptor, + Map<String, Object> params) { String className; if (netty) { if (acceptor) { @@ -1354,7 +1361,7 @@ public abstract class ActiveMQTestBase extends Assert { } if (params == null) params = new HashMap<>(); - return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String, Object>()); + return new TransportConfiguration(className, params, name, new HashMap<String, Object>()); } protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException { diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md index 694875c9f5..03f26fa30d 100644 --- a/docs/user-manual/en/ha.md +++ b/docs/user-manual/en/ha.md @@ -1150,6 +1150,47 @@ If you wish to provide *once and only once* delivery guarantees for non transacted sessions too, enable duplicate detection, and catch unblock exceptions as described in [Handling Blocking Calls During Failover](ha.md) +#### Use client connectors to fail over + +Apache ActiveMQ Artemis clients retrieve the backup connector from the +topology updates that the cluster brokers send. If the connection options +of the clients don't match the options of the cluster brokers the clients +can define a client connector that will be used in place of the connector +in the topology. To define a client connector it must have a name that matches +the name of the connector defined in the cluster connection of the broker, i.e. +supposing to have a live broker with the cluster connector name `node-0` +and a backup broker with the cluster connector name `node-1` the client +connection url must define 2 connectors with the names `node-0` and `node-1`: + +Live broker config +```xml +<connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="node-0">tcp://localhost:61616</connector> +</connectors> +<cluster-connections> +<cluster-connection name="my-cluster"> + <connector-ref>node-0</connector-ref> +... +``` + +Backup broker config +```xml +<connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="node-1">tcp://localhost:61617</connector> +</connectors> +<cluster-connections> +<cluster-connection name="my-cluster"> + <connector-ref>node-1</connector-ref> + ... +``` + +Client connection url +``` +(tcp://localhost:61616?name=node-0,tcp://localhost:61617?name=node-1)?ha=true&reconnectAttempts=-1 +``` + ### Getting Notified of Connection Failure JMS provides a standard mechanism for getting notified asynchronously of diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index a40083d402..0bfe403fc1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1957,13 +1957,14 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { throw new IllegalStateException("No server at node " + nodeFrom); } - TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom); + String connectorName = "node" + nodeFrom; + TransportConfiguration connectorFrom = createTransportConfiguration(connectorName, netty, false, generateParams(nodeFrom, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(connectorName, connectorFrom); List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); config.getClusterConfigurations().add(clusterConf); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java new file mode 100644 index 0000000000..7bdbea7a90 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQSession; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class ClientConnectorFailoverTest extends StaticClusterWithBackupFailoverTest { + + private static final String TEST_PARAM = "TEST"; + + @Override + protected boolean isNetty() { + return true; + } + + @Test + public void testConsumerAfterFailover() throws Exception { + setupCluster(); + startServers(getLiveServerIDs()); + startServers(getBackupServerIDs()); + + for (int i : getLiveServerIDs()) { + waitForTopology(servers[i], 3, 3); + } + + for (int i : getBackupServerIDs()) { + waitForFailoverTopology(i, 0, 1, 2); + } + + for (int i : getLiveServerIDs()) { + setupSessionFactory(i, i + 3, isNetty(), false); + createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + } + + List<TransportConfiguration> transportConfigList = new ArrayList<>(); + for (int i : getServerIDs()) { + Map<String, Object> params = generateParams(i, isNetty()); + TransportConfiguration serverToTC = createTransportConfiguration("node" + i, isNetty(), false, params); + serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM); + transportConfigList.add(serverToTC); + } + TransportConfiguration[] transportConfigs = transportConfigList.toArray(new TransportConfiguration[transportConfigList.size()]); + + try (ServerLocator serverLocator = new ServerLocatorImpl(true, transportConfigs)) { + serverLocator.setReconnectAttempts(-1); + try (ClientSessionFactory sessionFactory = serverLocator.createSessionFactory()) { + try (ClientSession clientSession = sessionFactory.createSession()) { + clientSession.start(); + + TransportConfiguration backupConnector = (TransportConfiguration) + ((ClientSessionFactoryImpl)sessionFactory).getBackupConnector(); + Assert.assertNotEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName()); + + int serverIdBeforeCrash = Integer.parseInt(sessionFactory. + getConnectorConfiguration().getName().substring(4)); + + try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) { + clientProducer.send(clientSession.createMessage(true)); + } + + crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession); + Assert.assertEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName()); + Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM)); + + int serverIdAfterCrash = Integer.parseInt(sessionFactory. + getConnectorConfiguration().getName().substring(4)); + Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash); + + try (ClientConsumer clientConsumer = clientSession.createConsumer(QUEUE_NAME)) { + Assert.assertNotNull(clientConsumer.receive(3000)); + } + + QueueControl testQueueControlAfterCrash = (QueueControl)getServer(serverIdAfterCrash). + getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME); + Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 0, 3000); + + clientSession.stop(); + } + } + } + } + + @Test + public void testJMSConsumerAfterFailover() throws Exception { + + setupCluster(); + startServers(getLiveServerIDs()); + startServers(getBackupServerIDs()); + + for (int i : getLiveServerIDs()) { + waitForTopology(servers[i], 3, 3); + } + + for (int i : getBackupServerIDs()) { + waitForFailoverTopology(i, 0, 1, 2); + } + + StringBuilder connectionURL = new StringBuilder(); + connectionURL.append("("); + for (int i : getServerIDs()) { + connectionURL.append("tcp://localhost:"); + connectionURL.append(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + i); + connectionURL.append("?name=node"); + connectionURL.append(i); + connectionURL.append("&"); + connectionURL.append(TEST_PARAM); + connectionURL.append("="); + connectionURL.append(TEST_PARAM); + connectionURL.append(","); + } + connectionURL.replace(connectionURL.length() - 1, connectionURL.length(), ")"); + connectionURL.append( "?ha=true&reconnectAttempts=-1"); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionURL.toString()); + + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + try (Session session = connection.createSession()) { + ClientSessionFactory sessionFactory = ((ActiveMQConnection)connection).getSessionFactory(); + TransportConfiguration backupConnector = (TransportConfiguration) + ((ClientSessionFactoryImpl)sessionFactory).getBackupConnector(); + Assert.assertNotEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName()); + + int serverIdBeforeCrash = Integer.parseInt(sessionFactory. + getConnectorConfiguration().getName().substring(4)); + + Queue testQueue = session.createQueue(QUEUE_NAME); + + try (MessageProducer producer = session.createProducer(testQueue)) { + producer.send(session.createTextMessage(TEST_PARAM)); + } + + ClientSession clientSession = ((ActiveMQSession)session).getCoreSession(); + crashAndWaitForFailure(getServer(serverIdBeforeCrash), clientSession); + Assert.assertEquals(backupConnector.getName(), sessionFactory.getConnectorConfiguration().getName()); + Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM)); + + int serverIdAfterCrash = Integer.parseInt(sessionFactory. + getConnectorConfiguration().getName().substring(4)); + Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash); + + try (MessageConsumer messageConsumer = session.createConsumer(testQueue)) { + Assert.assertNotNull(messageConsumer.receive(3000)); + } + + QueueControl testQueueControlAfterCrash = (QueueControl)getServer(serverIdAfterCrash). + getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME); + Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 0, 3000); + } + connection.stop(); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java index be5d841035..72c2f51f06 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java @@ -20,6 +20,18 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase { + protected int[] getServerIDs() { + return new int[]{0, 1, 2, 3, 4, 5}; + } + + protected int[] getLiveServerIDs() { + return new int[]{0, 1, 2}; + } + + protected int[] getBackupServerIDs() { + return new int[]{3, 4, 5}; + } + @Override protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});