This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push: new b9bcd2f AMQ-7165 - ensure failover updated uris are additive such that statically configured uris are respected b9bcd2f is described below commit b9bcd2fcc37837cafa0ff4caa54ae5a04a26ab99 Author: gtully <gary.tu...@gmail.com> AuthorDate: Tue Mar 12 12:24:20 2019 +0000 AMQ-7165 - ensure failover updated uris are additive such that statically configured uris are respected --- .../transport/failover/FailoverTransport.java | 29 ++++++++++------- .../failover/FailoverClusterTestSupport.java | 20 +++++++++++- .../failover/FailoverComplexClusterTest.java | 38 +++++++++++++++++++++- .../failover/TwoBrokerFailoverClusterTest.java | 5 ++- 4 files changed, 77 insertions(+), 15 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 9c24828..6b6f518 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.StringTokenizer; @@ -277,7 +278,7 @@ public class FailoverTransport implements CompositeTransport { reconnectOk = true; } - LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}", + LOG.warn("Transport ({}) failed{} attempting to automatically reconnect", connectedTransportURI, (reconnectOk ? "," : ", not"), e); failedConnectTransportURI = connectedTransportURI; @@ -290,7 +291,6 @@ public class FailoverTransport implements CompositeTransport { transportListener.transportInterupted(); } - updated.remove(failedConnectTransportURI); reconnectTask.wakeup(); } else if (!isDisposed()) { propagateFailureToExceptionListener(e); @@ -791,14 +791,16 @@ public class FailoverTransport implements CompositeTransport { } private List<URI> getConnectList() { - if (!updated.isEmpty()) { - return updated; - } - ArrayList<URI> l = new ArrayList<URI>(uris); + // updated have precedence + LinkedHashSet<URI> uniqueUris = new LinkedHashSet<URI>(updated); + uniqueUris.addAll(uris); + boolean removed = false; if (failedConnectTransportURI != null) { - removed = l.remove(failedConnectTransportURI); + removed = uniqueUris.remove(failedConnectTransportURI); } + + ArrayList<URI> l = new ArrayList<URI>(uniqueUris); if (randomize) { // Randomly, reorder the list by random swapping for (int i = 0; i < l.size(); i++) { @@ -813,7 +815,7 @@ public class FailoverTransport implements CompositeTransport { l.add(failedConnectTransportURI); } - LOG.debug("urlList connectionList:{}, from: {}", l, uris); + LOG.debug("urlList connectionList:{}, from: {}", l, uniqueUris); return l; } @@ -926,7 +928,7 @@ public class FailoverTransport implements CompositeTransport { final boolean doReconnect() { Exception failure = null; synchronized (reconnectMutex) { - + List<URI> connectList = null; // First ensure we are up to date. doUpdateURIsFromDisk(); @@ -936,7 +938,7 @@ public class FailoverTransport implements CompositeTransport { if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { return false; } else { - List<URI> connectList = getConnectList(); + connectList = getConnectList(); if (connectList.isEmpty()) { failure = new IOException("No uris available to connect to."); } else { @@ -1077,7 +1079,7 @@ public class FailoverTransport implements CompositeTransport { connectFailures++; if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { - LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures); + LOG.error("Failed to connect to {} after: {} attempt(s)", connectList, connectFailures); connectionFailure = failure; // Make sure on initial startup, that the transportListener has been @@ -1098,7 +1100,7 @@ public class FailoverTransport implements CompositeTransport { int warnInterval = getWarnAfterReconnectAttempts(); if (warnInterval > 0 && (connectFailures == 1 || (connectFailures % warnInterval) == 0)) { LOG.warn("Failed to connect to {} after: {} attempt(s) with {}, continuing to retry.", - uris, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage())); + connectList, connectFailures, (failure == null ? "?" : failure.getLocalizedMessage())); } } @@ -1286,6 +1288,9 @@ public class FailoverTransport implements CompositeTransport { for (URI uri : updatedURIs) { if (uri != null && !updated.contains(uri)) { updated.add(uri); + if (failedConnectTransportURI != null && failedConnectTransportURI.equals(uri)) { + failedConnectTransportURI = null; + } } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java index 01dcce4..70d5b31 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java @@ -34,6 +34,7 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.PublishedAddressPolicy; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.Wait; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; public class FailoverClusterTestSupport extends TestCase { protected final Logger logger = LoggerFactory.getLogger(getClass()); - private static final int NUMBER_OF_CLIENTS = 30; + protected static final int NUMBER_OF_CLIENTS = 30; private String clientUrl; @@ -104,6 +105,22 @@ public class FailoverClusterTestSupport extends TestCase { } } + protected void assertAllConnected(final int expected) throws Exception { + assertTrue("All connections connected!", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + int connectedCount = 0; + for (ActiveMQConnection c : connections) { + if(c.getTransportChannel().isConnected()) { + connectedCount++; + } + } + logger.info("Found " + connectedCount + " of " + expected + " connected"); + return connectedCount == expected; + } + })); + } + protected void assertAllConnectedTo(String url) throws Exception { for (ActiveMQConnection c : connections) { assertEquals(url, c.getTransportChannel().getRemoteAddress()); @@ -175,6 +192,7 @@ public class FailoverClusterTestSupport extends TestCase { connector.setUpdateClusterClients(false); connector.setUpdateClusterClientsOnRemove(false); } + connector.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS); } protected void addNetworkBridge(BrokerService answer, String bridgeName, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java index a92ceec..5b0a945 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.failover; +import org.apache.activemq.broker.PublishedAddressPolicy; import org.apache.activemq.broker.TransportConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,7 +174,7 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { initSingleTcBroker("", null, null); Thread.sleep(2000); - setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500"); + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500&randomize=false"); createClients(100); Thread.sleep(5000); @@ -236,6 +237,41 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport { assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS); } + public void testStaticInfoAvailableAfterPattialUpdate() throws Exception { + + addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); + TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS); + connectorA.setName("openwire"); + connectorA.setRebalanceClusterClients(true); + connectorA.setUpdateClusterClients(true); + connectorA.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS); + + getBroker(BROKER_A_NAME).start(); + + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "?trace=true," + BROKER_B_CLIENT_TC_ADDRESS + "?trace=true)?useExponentialBackOff=false&initialReconnectDelay=500"); + createClients(1); + + assertAllConnectedTo(BROKER_A_CLIENT_TC_ADDRESS); + + getBroker(BROKER_A_NAME).stop(); + + + addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); + TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS); + connectorB.setName("openwire"); + connectorB.setRebalanceClusterClients(true); + connectorB.setUpdateClusterClients(true); + connectorB.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS); + + getBroker(BROKER_B_NAME).start(); + + getBroker(BROKER_B_NAME).waitUntilStarted(); + Thread.sleep(1000); + + // verify can connect? + assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS); + } + /** * Runs a 3 Broker dynamic failover test: <br/> * <ul> diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java index 19addc3..32bfdc9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java @@ -31,7 +31,7 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport { getBroker(BROKER_B_NAME).waitUntilStarted(); Thread.sleep(2000); - setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&jms.watchTopicAdvisories=false"); createClients(); Thread.sleep(5000); @@ -46,14 +46,17 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport { Thread.sleep(1000); + assertAllConnected(NUMBER_OF_CLIENTS); assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS); Thread.sleep(5000); + logger.info("Restarting A"); createBrokerA(false, "", null, null); getBroker(BROKER_A_NAME).waitUntilStarted(); Thread.sleep(5000); + assertAllConnected(NUMBER_OF_CLIENTS); assertClientsConnectedToTwoBrokers(); assertClientsConnectionsEvenlyDistributed(.35); }