This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bcd2f  AMQ-7165 - ensure failover updated uris are additive such 
that statically configured uris are respected
b9bcd2f is described below

commit b9bcd2fcc37837cafa0ff4caa54ae5a04a26ab99
Author: gtully <gary.tu...@gmail.com>
AuthorDate: Tue Mar 12 12:24:20 2019 +0000

    AMQ-7165 - ensure failover updated uris are additive such that statically 
configured uris are respected
---
 .../transport/failover/FailoverTransport.java      | 29 ++++++++++-------
 .../failover/FailoverClusterTestSupport.java       | 20 +++++++++++-
 .../failover/FailoverComplexClusterTest.java       | 38 +++++++++++++++++++++-
 .../failover/TwoBrokerFailoverClusterTest.java     |  5 ++-
 4 files changed, 77 insertions(+), 15 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 9c24828..6b6f518 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -277,7 +278,7 @@ public class FailoverTransport implements 
CompositeTransport {
                     reconnectOk = true;
                 }
 
-                LOG.warn("Transport ({}) failed {} attempting to automatically 
reconnect: {}",
+                LOG.warn("Transport ({}) failed{} attempting to automatically 
reconnect",
                          connectedTransportURI, (reconnectOk ? "," : ", not"), 
e);
 
                 failedConnectTransportURI = connectedTransportURI;
@@ -290,7 +291,6 @@ public class FailoverTransport implements 
CompositeTransport {
                         transportListener.transportInterupted();
                     }
 
-                    updated.remove(failedConnectTransportURI);
                     reconnectTask.wakeup();
                 } else if (!isDisposed()) {
                     propagateFailureToExceptionListener(e);
@@ -791,14 +791,16 @@ public class FailoverTransport implements 
CompositeTransport {
     }
 
     private List<URI> getConnectList() {
-        if (!updated.isEmpty()) {
-            return updated;
-        }
-        ArrayList<URI> l = new ArrayList<URI>(uris);
+        // updated have precedence
+        LinkedHashSet<URI> uniqueUris = new LinkedHashSet<URI>(updated);
+        uniqueUris.addAll(uris);
+
         boolean removed = false;
         if (failedConnectTransportURI != null) {
-            removed = l.remove(failedConnectTransportURI);
+            removed = uniqueUris.remove(failedConnectTransportURI);
         }
+
+        ArrayList<URI> l = new ArrayList<URI>(uniqueUris);
         if (randomize) {
             // Randomly, reorder the list by random swapping
             for (int i = 0; i < l.size(); i++) {
@@ -813,7 +815,7 @@ public class FailoverTransport implements 
CompositeTransport {
             l.add(failedConnectTransportURI);
         }
 
-        LOG.debug("urlList connectionList:{}, from: {}", l, uris);
+        LOG.debug("urlList connectionList:{}, from: {}", l, uniqueUris);
 
         return l;
     }
@@ -926,7 +928,7 @@ public class FailoverTransport implements 
CompositeTransport {
     final boolean doReconnect() {
         Exception failure = null;
         synchronized (reconnectMutex) {
-
+            List<URI> connectList = null;
             // First ensure we are up to date.
             doUpdateURIsFromDisk();
 
@@ -936,7 +938,7 @@ public class FailoverTransport implements 
CompositeTransport {
             if ((connectedTransport.get() != null && !doRebalance && 
!priorityBackupAvailable) || disposed || connectionFailure != null) {
                 return false;
             } else {
-                List<URI> connectList = getConnectList();
+                connectList = getConnectList();
                 if (connectList.isEmpty()) {
                     failure = new IOException("No uris available to connect 
to.");
                 } else {
@@ -1077,7 +1079,7 @@ public class FailoverTransport implements 
CompositeTransport {
 
             connectFailures++;
             if (reconnectLimit != INFINITE && connectFailures >= 
reconnectLimit) {
-                LOG.error("Failed to connect to {} after: {} attempt(s)", 
uris, connectFailures);
+                LOG.error("Failed to connect to {} after: {} attempt(s)", 
connectList, connectFailures);
                 connectionFailure = failure;
 
                 // Make sure on initial startup, that the transportListener 
has been
@@ -1098,7 +1100,7 @@ public class FailoverTransport implements 
CompositeTransport {
             int warnInterval = getWarnAfterReconnectAttempts();
             if (warnInterval > 0 && (connectFailures == 1 || (connectFailures 
% warnInterval) == 0)) {
                 LOG.warn("Failed to connect to {} after: {} attempt(s) with 
{}, continuing to retry.",
-                         uris, connectFailures, (failure == null ? "?" : 
failure.getLocalizedMessage()));
+                         connectList, connectFailures, (failure == null ? "?" 
: failure.getLocalizedMessage()));
             }
         }
 
@@ -1286,6 +1288,9 @@ public class FailoverTransport implements 
CompositeTransport {
                     for (URI uri : updatedURIs) {
                         if (uri != null && !updated.contains(uri)) {
                             updated.add(uri);
+                            if (failedConnectTransportURI != null && 
failedConnectTransportURI.equals(uri)) {
+                                failedConnectTransportURI = null;
+                            }
                         }
                     }
                 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
index 01dcce4..70d5b31 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.PublishedAddressPolicy;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.Wait;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
 public class FailoverClusterTestSupport extends TestCase {
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private static final int NUMBER_OF_CLIENTS = 30;
+    protected static final int NUMBER_OF_CLIENTS = 30;
 
     private String clientUrl;
 
@@ -104,6 +105,22 @@ public class FailoverClusterTestSupport extends TestCase {
         }
     }
 
+    protected void assertAllConnected(final int expected) throws Exception {
+        assertTrue("All connections connected!", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                int connectedCount = 0;
+                for (ActiveMQConnection c : connections) {
+                    if(c.getTransportChannel().isConnected()) {
+                        connectedCount++;
+                    }
+                }
+                logger.info("Found " + connectedCount + " of " + expected + " 
connected");
+                return connectedCount == expected;
+            }
+        }));
+    }
+
     protected void assertAllConnectedTo(String url) throws Exception {
         for (ActiveMQConnection c : connections) {
             assertEquals(url, c.getTransportChannel().getRemoteAddress());
@@ -175,6 +192,7 @@ public class FailoverClusterTestSupport extends TestCase {
             connector.setUpdateClusterClients(false);
             connector.setUpdateClusterClientsOnRemove(false);
         }
+        
connector.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
     }
 
     protected void addNetworkBridge(BrokerService answer, String bridgeName,
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index a92ceec..5b0a945 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.failover;
 
+import org.apache.activemq.broker.PublishedAddressPolicy;
 import org.apache.activemq.broker.TransportConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,7 +174,7 @@ public class FailoverComplexClusterTest extends 
FailoverClusterTestSupport {
         initSingleTcBroker("", null, null);
 
         Thread.sleep(2000);
-        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + 
BROKER_B_CLIENT_TC_ADDRESS + 
")?useExponentialBackOff=false&initialReconnectDelay=500");
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + 
BROKER_B_CLIENT_TC_ADDRESS + 
")?useExponentialBackOff=false&initialReconnectDelay=500&randomize=false");
         createClients(100);
         Thread.sleep(5000);
 
@@ -236,6 +237,41 @@ public class FailoverComplexClusterTest extends 
FailoverClusterTestSupport {
         assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
     }
 
+    public void testStaticInfoAvailableAfterPattialUpdate() throws Exception {
+
+        addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+        TransportConnector connectorA = 
getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
+        connectorA.setName("openwire");
+        connectorA.setRebalanceClusterClients(true);
+        connectorA.setUpdateClusterClients(true);
+        
connectorA.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
+
+        getBroker(BROKER_A_NAME).start();
+
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + 
"?trace=true," + BROKER_B_CLIENT_TC_ADDRESS + 
"?trace=true)?useExponentialBackOff=false&initialReconnectDelay=500");
+        createClients(1);
+
+        assertAllConnectedTo(BROKER_A_CLIENT_TC_ADDRESS);
+
+        getBroker(BROKER_A_NAME).stop();
+
+
+        addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+        TransportConnector connectorB = 
getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
+        connectorB.setName("openwire");
+        connectorB.setRebalanceClusterClients(true);
+        connectorB.setUpdateClusterClients(true);
+        
connectorB.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
+
+        getBroker(BROKER_B_NAME).start();
+
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        Thread.sleep(1000);
+
+        // verify can connect?
+        assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+    }
+
     /**
      * Runs a 3 Broker dynamic failover test: <br/>
      * <ul>
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 19addc3..32bfdc9 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -31,7 +31,7 @@ public class TwoBrokerFailoverClusterTest extends 
FailoverClusterTestSupport {
         getBroker(BROKER_B_NAME).waitUntilStarted();
 
         Thread.sleep(2000);
-        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + 
BROKER_B_CLIENT_TC_ADDRESS + ")");
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + 
BROKER_B_CLIENT_TC_ADDRESS + 
")?randomize=false&jms.watchTopicAdvisories=false");
         createClients();
 
         Thread.sleep(5000);
@@ -46,14 +46,17 @@ public class TwoBrokerFailoverClusterTest extends 
FailoverClusterTestSupport {
 
         Thread.sleep(1000);
 
+        assertAllConnected(NUMBER_OF_CLIENTS);
         assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
 
         Thread.sleep(5000);
 
+        logger.info("Restarting A");
         createBrokerA(false, "", null, null);
         getBroker(BROKER_A_NAME).waitUntilStarted();
         Thread.sleep(5000);
 
+        assertAllConnected(NUMBER_OF_CLIENTS);
         assertClientsConnectedToTwoBrokers();
         assertClientsConnectionsEvenlyDistributed(.35);
     }

Reply via email to