This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new bb08a573eb ARTEMIS-3640 Use client connectors for HA
bb08a573eb is described below

commit bb08a573ebdd414abcebad9223a599360443facf
Author: Domenico Francesco Bruscino <brus...@apache.org>
AuthorDate: Mon Mar 6 16:44:19 2023 +0100

    ARTEMIS-3640 Use client connectors for HA
---
 .../artemis/api/core/TransportConfiguration.java   |   2 +
 .../core/client/impl/ClientSessionFactoryImpl.java |  36 +++-
 .../core/client/impl/ServerLocatorImpl.java        |   2 +-
 .../connector/TCPTransportConfigurationSchema.java |  12 +-
 ...nnectorTransportConfigurationParserURITest.java |  22 +++
 .../artemis/tests/util/ActiveMQTestBase.java       |   9 +-
 docs/user-manual/en/ha.md                          |  41 +++++
 .../cluster/distribution/ClusterTestBase.java      |   7 +-
 .../failover/ClientConnectorFailoverTest.java      | 192 +++++++++++++++++++++
 .../StaticClusterWithBackupFailoverTest.java       |  12 ++
 10 files changed, 325 insertions(+), 10 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index 5a823a8369..048ea51ffc 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -47,6 +47,8 @@ public class TransportConfiguration implements Serializable {
 
    private static final long serialVersionUID = -3994528421527392679L;
 
+   public static final String NAME_PARAM = "name";
+
    public static final String EXTRA_PROPERTY_PREFIX = "$.EP.";
 
    private String name;
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index ac6b70c7f0..1f21d3351b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -82,7 +82,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
    private final ClientProtocolManager clientProtocolManager;
 
-   private final TransportConfiguration connectorConfig;
+   private TransportConfiguration connectorConfig;
 
    private TransportConfiguration previousConnectorConfig;
 
@@ -158,6 +158,8 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
    private final Object connectionReadyLock = new Object();
 
+   private final TransportConfiguration[] connectorConfigs;
+
    public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
                                    final TransportConfiguration 
connectorConfig,
                                    final ServerLocatorConfig locatorConfig,
@@ -171,6 +173,19 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                scheduledThreadPool, incomingInterceptors, 
outgoingInterceptors);
    }
 
+   ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
+      final Pair<TransportConfiguration, TransportConfiguration> 
connectorConfig,
+      final ServerLocatorConfig locatorConfig,
+      final int reconnectAttempts,
+      final Executor threadPool,
+      final ScheduledExecutorService scheduledThreadPool,
+      final List<Interceptor> incomingInterceptors,
+      final List<Interceptor> outgoingInterceptors) {
+      this(serverLocator, connectorConfig,
+         locatorConfig, reconnectAttempts, threadPool,
+         scheduledThreadPool, incomingInterceptors, outgoingInterceptors, 
null);
+   }
+
    ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
                           final Pair<TransportConfiguration, 
TransportConfiguration> connectorConfig,
                           final ServerLocatorConfig locatorConfig,
@@ -178,7 +193,8 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                           final Executor threadPool,
                           final ScheduledExecutorService scheduledThreadPool,
                           final List<Interceptor> incomingInterceptors,
-                          final List<Interceptor> outgoingInterceptors) {
+                          final List<Interceptor> outgoingInterceptors,
+                          final TransportConfiguration[] connectorConfigs) {
       createTrace = new Exception();
 
       this.serverLocator = serverLocator;
@@ -238,6 +254,8 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
       if (connectorConfig.getB() != null) {
          this.backupConnectorConfig = connectorConfig.getB();
       }
+
+      this.connectorConfigs = connectorConfigs;
    }
 
    @Override
@@ -1150,6 +1168,20 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
 
          if (backupConnectorConfig != null) {
+            //Try to connect with the client connector that match the backup 
connector name
+            String backupConnectorName = backupConnectorConfig.getName();
+            if (backupConnectorName != null && connectorConfigs != null) {
+               for (TransportConfiguration connectorConfig : connectorConfigs) 
{
+                  if (backupConnectorName.equals(connectorConfig.getName())) {
+                     //Try to connect with the backup connector configuration
+                     transportConnection = createTransportConnection("backup", 
connectorConfig);
+                     if (transportConnection != null) {
+                        return transportConnection;
+                     }
+                  }
+               }
+            }
+
             //Try to connect with the backup connector configuration
             transportConnection = createTransportConnection("backup", 
backupConnectorConfig);
             if (transportConnection != null) {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 127a3ea0f4..fa6b6b0dfb 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -678,7 +678,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
             // try each factory in the list until we find one which works
 
             try {
-               factory = new ClientSessionFactoryImpl(this, tc, config, 
config.reconnectAttempts, threadPool, scheduledThreadPool, 
incomingInterceptors, outgoingInterceptors);
+               factory = new ClientSessionFactoryImpl(this, tc, config, 
config.reconnectAttempts, threadPool, scheduledThreadPool, 
incomingInterceptors, outgoingInterceptors, initialConnectors);
                try {
                   addToConnecting(factory);
                   // We always try to connect here with only one attempt,
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
index e962a5d211..99c8da1391 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
@@ -65,7 +65,11 @@ public class TCPTransportConfigurationSchema extends 
AbstractTransportConfigurat
       BeanSupport.setData(uri, props, allowableProperties, query, extraProps);
       List<TransportConfiguration> transportConfigurations = new ArrayList<>();
 
-      TransportConfiguration config = new TransportConfiguration(factoryName, 
props, name, extraProps);
+      String nameFromQuery = 
query.getOrDefault(TransportConfiguration.NAME_PARAM, name);
+      if (name != null && !name.equals(nameFromQuery)) {
+         throw new IllegalArgumentException("Name doesn't match query param");
+      }
+      TransportConfiguration config = new TransportConfiguration(factoryName, 
props, nameFromQuery, extraProps);
 
       transportConfigurations.add(config);
       String connectors = uri.getFragment();
@@ -77,8 +81,10 @@ public class TCPTransportConfigurationSchema extends 
AbstractTransportConfigurat
             HashMap<String, Object> newProps = new HashMap<>();
             extraProps = new HashMap<>();
             BeanSupport.setData(extraUri, newProps, allowableProperties, 
query, extraProps);
-            BeanSupport.setData(extraUri, newProps, allowableProperties, 
parseQuery(extraUri.getQuery(), null), extraProps);
-            transportConfigurations.add(new 
TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), 
extraProps));
+            Map<String, String> extraUriQuery = 
parseQuery(extraUri.getQuery(), null);
+            BeanSupport.setData(extraUri, newProps, allowableProperties, 
extraUriQuery, extraProps);
+            String extraUriNameFromQuery = 
extraUriQuery.getOrDefault(TransportConfiguration.NAME_PARAM, name + ":" + 
extraUri);
+            transportConfigurations.add(new 
TransportConfiguration(factoryName, newProps, extraUriNameFromQuery, 
extraProps));
          }
       }
       return transportConfigurations;
diff --git 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
index 8d82fbb577..6c8f5bc859 100644
--- 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
+++ 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
@@ -51,4 +51,26 @@ public class ConnectorTransportConfigurationParserURITest {
       Assert.assertEquals("3", objects.get(2).getParams().get("port"));
    }
 
+   @Test
+   public void testParseMultipleConnectorWithName() throws Exception {
+      ConnectorTransportConfigurationParser parser = new 
ConnectorTransportConfigurationParser(false);
+
+      URI transportURI = 
parser.expandURI("(tcp://live:1?name=live1,tcp://backupA:2?name=backupA2,tcp://backupB:3?name=backupB3");
+      System.out.println(transportURI);
+      List<TransportConfiguration> objects = parser.newObject(transportURI, 
null);
+      if (logger.isInfoEnabled()) {
+         objects.forEach(t -> logger.info("transportConfig: {}", t));
+      }
+
+      Assert.assertEquals(3, objects.size());
+      Assert.assertEquals("live1", objects.get(0).getName());
+      Assert.assertEquals("live", objects.get(0).getParams().get("host"));
+      Assert.assertEquals("1", objects.get(0).getParams().get("port"));
+      Assert.assertEquals("backupA2", objects.get(1).getName());
+      Assert.assertEquals("backupA", objects.get(1).getParams().get("host"));
+      Assert.assertEquals("2", objects.get(1).getParams().get("port"));
+      Assert.assertEquals("backupB3", objects.get(2).getName());
+      Assert.assertEquals("backupB", objects.get(2).getParams().get("host"));
+      Assert.assertEquals("3", objects.get(2).getParams().get("port"));
+   }
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 95015ba719..4454d61b71 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1338,6 +1338,13 @@ public abstract class ActiveMQTestBase extends Assert {
    protected static final TransportConfiguration 
createTransportConfiguration(boolean netty,
                                                                               
boolean acceptor,
                                                                               
Map<String, Object> params) {
+      return 
createTransportConfiguration(UUIDGenerator.getInstance().generateStringUUID(), 
netty, acceptor, params);
+   }
+
+   protected static final TransportConfiguration 
createTransportConfiguration(String name,
+                                                                              
boolean netty,
+                                                                              
boolean acceptor,
+                                                                              
Map<String, Object> params) {
       String className;
       if (netty) {
          if (acceptor) {
@@ -1354,7 +1361,7 @@ public abstract class ActiveMQTestBase extends Assert {
       }
       if (params == null)
          params = new HashMap<>();
-      return new TransportConfiguration(className, params, 
UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String, 
Object>());
+      return new TransportConfiguration(className, params, name, new 
HashMap<String, Object>());
    }
 
    protected void waitForServerToStart(ActiveMQServer server) throws 
InterruptedException {
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index 694875c9f5..03f26fa30d 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -1150,6 +1150,47 @@ If you wish to provide *once and only once* delivery 
guarantees for non
 transacted sessions too, enable duplicate detection, and catch unblock
 exceptions as described in [Handling Blocking Calls During Failover](ha.md)
 
+#### Use client connectors to fail over
+
+Apache ActiveMQ Artemis clients retrieve the backup connector from the
+topology updates that the cluster brokers send. If the connection options
+of the clients don't match the options of the cluster brokers the clients
+can define a client connector that will be used in place of the connector
+in the topology. To define a client connector it must have a name that matches
+the name of the connector defined in the cluster connection of the broker, i.e.
+supposing to have a live broker with the cluster connector name `node-0`
+and a backup broker with the cluster connector name `node-1` the client
+connection url must define 2 connectors with the names `node-0` and `node-1`:
+
+Live broker config
+```xml
+<connectors>
+    <!-- Connector used to be announced through cluster connections and 
notifications -->
+    <connector name="node-0">tcp://localhost:61616</connector>
+</connectors>
+<cluster-connections>
+<cluster-connection name="my-cluster">
+  <connector-ref>node-0</connector-ref>
+...
+```
+
+Backup broker config
+```xml
+<connectors>
+    <!-- Connector used to be announced through cluster connections and 
notifications -->
+    <connector name="node-1">tcp://localhost:61617</connector>
+</connectors>
+<cluster-connections>
+<cluster-connection name="my-cluster">
+  <connector-ref>node-1</connector-ref>
+  ...
+```
+
+Client connection url
+```
+(tcp://localhost:61616?name=node-0,tcp://localhost:61617?name=node-1)?ha=true&reconnectAttempts=-1
+```
+
 ### Getting Notified of Connection Failure
 
 JMS provides a standard mechanism for getting notified asynchronously of
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index a40083d402..0bfe403fc1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1957,13 +1957,14 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
          throw new IllegalStateException("No server at node " + nodeFrom);
       }
 
-      TransportConfiguration connectorFrom = 
createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
-      serverFrom.getConfiguration().getConnectorConfigurations().put(name, 
connectorFrom);
+      String connectorName = "node" + nodeFrom;
+      TransportConfiguration connectorFrom = 
createTransportConfiguration(connectorName, netty, false, 
generateParams(nodeFrom, netty));
+      
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorName, 
connectorFrom);
 
       List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, 
nodesTo);
       Configuration config = serverFrom.getConfiguration();
 
-      ClusterConnectionConfiguration clusterConf = new 
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+      ClusterConnectionConfiguration clusterConf = new 
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
 
       config.getClusterConfigurations().add(clusterConf);
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
new file mode 100644
index 0000000000..7bdbea7a90
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientConnectorFailoverTest extends 
StaticClusterWithBackupFailoverTest {
+
+   private static final String TEST_PARAM = "TEST";
+
+   @Override
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testConsumerAfterFailover() throws Exception {
+      setupCluster();
+      startServers(getLiveServerIDs());
+      startServers(getBackupServerIDs());
+
+      for (int i : getLiveServerIDs()) {
+         waitForTopology(servers[i], 3, 3);
+      }
+
+      for (int i : getBackupServerIDs()) {
+         waitForFailoverTopology(i, 0, 1, 2);
+      }
+
+      for (int i : getLiveServerIDs()) {
+         setupSessionFactory(i, i + 3, isNetty(), false);
+         createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+      }
+
+      List<TransportConfiguration> transportConfigList = new ArrayList<>();
+      for (int i : getServerIDs()) {
+         Map<String, Object> params = generateParams(i, isNetty());
+         TransportConfiguration serverToTC = 
createTransportConfiguration("node" + i, isNetty(), false, params);
+         serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
+         transportConfigList.add(serverToTC);
+      }
+      TransportConfiguration[] transportConfigs = 
transportConfigList.toArray(new 
TransportConfiguration[transportConfigList.size()]);
+
+      try (ServerLocator serverLocator = new ServerLocatorImpl(true, 
transportConfigs)) {
+         serverLocator.setReconnectAttempts(-1);
+         try (ClientSessionFactory sessionFactory = 
serverLocator.createSessionFactory()) {
+            try (ClientSession clientSession = sessionFactory.createSession()) 
{
+               clientSession.start();
+
+               TransportConfiguration backupConnector = 
(TransportConfiguration)
+                  
((ClientSessionFactoryImpl)sessionFactory).getBackupConnector();
+               Assert.assertNotEquals(backupConnector.getName(), 
sessionFactory.getConnectorConfiguration().getName());
+
+               int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+                  getConnectorConfiguration().getName().substring(4));
+
+               try (ClientProducer clientProducer = 
clientSession.createProducer(QUEUES_TESTADDRESS)) {
+                  clientProducer.send(clientSession.createMessage(true));
+               }
+
+               crashAndWaitForFailure(getServer(serverIdBeforeCrash), 
clientSession);
+               Assert.assertEquals(backupConnector.getName(), 
sessionFactory.getConnectorConfiguration().getName());
+               Assert.assertEquals(TEST_PARAM, 
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+               int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+                  getConnectorConfiguration().getName().substring(4));
+               Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
+
+               try (ClientConsumer clientConsumer = 
clientSession.createConsumer(QUEUE_NAME)) {
+                  Assert.assertNotNull(clientConsumer.receive(3000));
+               }
+
+               QueueControl testQueueControlAfterCrash = 
(QueueControl)getServer(serverIdAfterCrash).
+                  getManagementService().getResource(ResourceNames.QUEUE + 
QUEUE_NAME);
+               Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() 
== 0, 3000);
+
+               clientSession.stop();
+            }
+         }
+      }
+   }
+
+   @Test
+   public void testJMSConsumerAfterFailover() throws Exception {
+
+      setupCluster();
+      startServers(getLiveServerIDs());
+      startServers(getBackupServerIDs());
+
+      for (int i : getLiveServerIDs()) {
+         waitForTopology(servers[i], 3, 3);
+      }
+
+      for (int i : getBackupServerIDs()) {
+         waitForFailoverTopology(i, 0, 1, 2);
+      }
+
+      StringBuilder connectionURL = new StringBuilder();
+      connectionURL.append("(");
+      for (int i : getServerIDs()) {
+         connectionURL.append("tcp://localhost:");
+         
connectionURL.append(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
 + i);
+         connectionURL.append("?name=node");
+         connectionURL.append(i);
+         connectionURL.append("&");
+         connectionURL.append(TEST_PARAM);
+         connectionURL.append("=");
+         connectionURL.append(TEST_PARAM);
+         connectionURL.append(",");
+      }
+      connectionURL.replace(connectionURL.length() - 1, 
connectionURL.length(), ")");
+      connectionURL.append( "?ha=true&reconnectAttempts=-1");
+
+      ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(connectionURL.toString());
+
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession()) {
+            ClientSessionFactory sessionFactory = 
((ActiveMQConnection)connection).getSessionFactory();
+            TransportConfiguration backupConnector = (TransportConfiguration)
+               ((ClientSessionFactoryImpl)sessionFactory).getBackupConnector();
+            Assert.assertNotEquals(backupConnector.getName(), 
sessionFactory.getConnectorConfiguration().getName());
+
+            int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+               getConnectorConfiguration().getName().substring(4));
+
+            Queue testQueue = session.createQueue(QUEUE_NAME);
+
+            try (MessageProducer producer = session.createProducer(testQueue)) 
{
+               producer.send(session.createTextMessage(TEST_PARAM));
+            }
+
+            ClientSession clientSession = 
((ActiveMQSession)session).getCoreSession();
+            crashAndWaitForFailure(getServer(serverIdBeforeCrash), 
clientSession);
+            Assert.assertEquals(backupConnector.getName(), 
sessionFactory.getConnectorConfiguration().getName());
+            Assert.assertEquals(TEST_PARAM, 
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+            int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+               getConnectorConfiguration().getName().substring(4));
+            Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
+
+            try (MessageConsumer messageConsumer = 
session.createConsumer(testQueue)) {
+               Assert.assertNotNull(messageConsumer.receive(3000));
+            }
+
+            QueueControl testQueueControlAfterCrash = 
(QueueControl)getServer(serverIdAfterCrash).
+               getManagementService().getResource(ResourceNames.QUEUE + 
QUEUE_NAME);
+            Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 
0, 3000);
+         }
+         connection.stop();
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
index be5d841035..72c2f51f06 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
@@ -20,6 +20,18 @@ import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
 
 public class StaticClusterWithBackupFailoverTest extends 
ClusterWithBackupFailoverTestBase {
 
+   protected int[] getServerIDs() {
+      return new int[]{0, 1, 2, 3, 4, 5};
+   }
+
+   protected int[] getLiveServerIDs() {
+      return new int[]{0, 1, 2};
+   }
+
+   protected int[] getBackupServerIDs() {
+      return new int[]{3, 4, 5};
+   }
+
    @Override
    protected void setupCluster(final MessageLoadBalancingType 
messageLoadBalancingType) throws Exception {
       setupClusterConnectionWithBackups("cluster0", "queues", 
messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});

Reply via email to