Author: gtully
Date: Thu Oct 13 20:07:48 2011
New Revision: 1183062
URL: http://svn.apache.org/viewvc?rev=1183062&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3542 - Using failover: with static
discovery in a network connector to choose from a master/slave tuple leads to
hangs and invalid states. issue with demand forward bridge reacting to failover
transport interupt/resume leading to race conditions. race condition with
tracking bridges and restarts. change default maxReconnectAttempts=0 to mean
none, -1 for infinte. Default behavour is still infinite. To reference a master
slave pair, use:
static:(failover:(a,b)?maxReconnectAttempts=0)?useExponentialBackOff=false. see
org.apache.activemq.network.FailoverStaticNetworkTest
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Oct 13 20:07:48 2011
@@ -116,7 +116,6 @@ public abstract class DemandForwardingBr
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
protected CountDownLatch remoteBrokerNameKnownLatch = new
CountDownLatch(1);
protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
- protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new
AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected NetworkBridgeFilterFactory filterFactory;
@@ -163,7 +162,7 @@ public abstract class DemandForwardingBr
serviceLocalException(error);
}
});
- remoteBroker.setTransportListener(new TransportListener() {
+ remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
Command command = (Command) o;
@@ -174,55 +173,6 @@ public abstract class DemandForwardingBr
serviceRemoteException(error);
}
- public void transportInterupted() {
- // clear any subscriptions - to try and prevent the bridge
- // from stalling the broker
- if (remoteInterupted.compareAndSet(false, true)) {
- LOG.info("Outbound transport to " + remoteBrokerName +
" interrupted.");
- if (localBridgeStarted.get()) {
- clearDownSubscriptions();
- synchronized (DemandForwardingBridgeSupport.this) {
- try {
-
localBroker.oneway(localConnectionInfo.createRemoveCommand());
- } catch (TransportDisposedIOException td) {
- LOG.debug("local broker is now disposed",
td);
- } catch (IOException e) {
- LOG.warn("Caught exception from local
start", e);
- }
- }
- }
- localBridgeStarted.set(false);
- remoteBridgeStarted.set(false);
- startedLatch = new CountDownLatch(2);
- localStartedLatch = new CountDownLatch(1);
- }
- }
-
- public void transportResumed() {
- if (remoteInterupted.compareAndSet(true, false)) {
- // We want to slow down false connects so that we don't
- // get in a busy loop.
- // False connects can occurr if you using SSH tunnels.
- if (!lastConnectSucceeded.get()) {
- try {
- LOG.debug("Previous connection was never fully
established. Sleeping for second to avoid busy loop.");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- lastConnectSucceeded.set(false);
- try {
- startLocalBridge();
- remoteBridgeStarted.set(true);
- startedLatch.countDown();
- LOG.info("Outbound transport to " +
remoteBrokerName + " resumed");
- } catch (Throwable e) {
- LOG.error("Caught exception from local start in
resume transport", e);
- serviceLocalException(e);
- }
- }
- }
});
localBroker.start();
@@ -260,7 +210,7 @@ public abstract class DemandForwardingBr
asyncTaskRunner.execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("StartRemotelBridge:
localBroker=" + localBroker);
+ Thread.currentThread().setName("StartRemoteBridge:
remoteBroker=" + remoteBroker);
try {
startRemoteBridge();
} catch (Exception e) {
@@ -782,14 +732,7 @@ public abstract class DemandForwardingBr
serviceLocalBrokerInfo(command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
- // Don't shut down the whole connector if the remote side
- // was interrupted.
- // the local transport is just shutting down temporarily
- // until the remote side
- // is restored.
- if (!remoteInterupted.get()) {
- stop();
- }
+ stop();
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Thu Oct 13 20:07:48 2011
@@ -90,9 +90,11 @@ public class DiscoveryNetworkConnector e
return;
}
// Should we try to connect to that URI?
- if( bridges.containsKey(uri) ) {
- LOG.debug("Discovery agent generated a duplicate onServiceAdd
event for: "+uri );
- return;
+ synchronized (bridges) {
+ if( bridges.containsKey(uri) ) {
+ LOG.debug("Discovery agent generated a duplicate
onServiceAdd event for: "+uri );
+ return;
+ }
}
if ( localURI.equals(uri) || (connectionFilter != null &&
!connectionFilter.connectTo(uri))) {
LOG.debug("not connecting loopback: " + uri);
@@ -132,7 +134,9 @@ public class DiscoveryNetworkConnector e
NetworkBridge bridge = createBridge(localTransport,
remoteTransport, event);
try {
bridge.start();
- bridges.put(uri, bridge);
+ synchronized (bridges) {
+ bridges.put(uri, bridge);
+ }
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
@@ -158,12 +162,13 @@ public class DiscoveryNetworkConnector e
return;
}
- NetworkBridge bridge = bridges.remove(uri);
- if (bridge == null) {
- return;
+ NetworkBridge bridge;
+ synchronized (bridges) {
+ bridge = bridges.remove(uri);
+ }
+ if (bridge != null) {
+ ServiceSupport.dispose(bridge);
}
-
- ServiceSupport.dispose(bridge);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Thu Oct 13 20:07:48 2011
@@ -58,6 +58,10 @@ public class SimpleDiscoveryAgent implem
super(service);
}
+ @Override
+ public String toString() {
+ return "[" + serviceName + ", failed:" + failed + ",
connectionFailures:" + connectFailures + "]";
+ }
}
public void setDiscoveryListener(DiscoveryListener listener) {
@@ -118,7 +122,7 @@ public class SimpleDiscoveryAgent implem
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures
>= maxReconnectAttempts) {
- LOG.warn("Reconnect attempts exceeded
"+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
+ LOG.warn("Reconnect attempts exceeded
"+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event);
return;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Oct 13 20:07:48 2011
@@ -67,6 +67,7 @@ public class FailoverTransport implement
private static final Logger LOG =
LoggerFactory.getLogger(FailoverTransport.class);
private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
+ private static final int INFINITE = -1;
private TransportListener transportListener;
private boolean disposed;
private boolean connected;
@@ -89,11 +90,11 @@ public class FailoverTransport implement
private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private long maxReconnectDelay = 1000 * 30;
private double backOffMultiplier = 2d;
- private long timeout = -1;
+ private long timeout = INFINITE;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
- private int maxReconnectAttempts;
- private int startupMaxReconnectAttempts;
+ private int maxReconnectAttempts = INFINITE;
+ private int startupMaxReconnectAttempts = INFINITE;
private int connectFailures;
private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private Exception connectionFailure;
@@ -107,8 +108,6 @@ public class FailoverTransport implement
private int maxCacheSize = 128 * 1024;
private final TransportListener disposedListener = new
DefaultTransportListener() {
};
- //private boolean connectionInterruptProcessingComplete;
-
private final TransportListener myTransportListener =
createTransportListener();
private boolean updateURIsSupported = true;
private boolean reconnectSupported = true;
@@ -222,12 +221,12 @@ public class FailoverTransport implement
boolean reconnectOk = false;
synchronized (reconnectMutex) {
- if (started) {
- LOG.warn("Transport (" + transport.getRemoteAddress() + ")
failed to " + connectedTransportURI
- + " , attempting to automatically reconnect due
to: " + e);
- LOG.debug("Transport failed with the following
exception:", e);
+ if (canReconnect()) {
reconnectOk = true;
}
+ LOG.warn("Transport (" + transport.getRemoteAddress() + ")
failed, reason: " + e
+ + (reconnectOk ? "," : ", not") +" attempting to
automatically reconnect");
+
initialized = false;
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
@@ -240,11 +239,17 @@ public class FailoverTransport implement
if (reconnectOk) {
reconnectTask.wakeup();
+ } else {
+ propagateFailureToExceptionListener(e);
}
}
}
}
+ private boolean canReconnect() {
+ return started && 0 != calculateReconnectAttemptLimit();
+ }
+
public final void handleConnectionControl(ConnectionControl control) {
String reconnectStr = control.getReconnectTo();
if (reconnectStr != null) {
@@ -292,7 +297,9 @@ public class FailoverTransport implement
public void start() throws Exception {
synchronized (reconnectMutex) {
- LOG.debug("Started.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Started " + this);
+ }
if (started) {
return;
}
@@ -311,7 +318,9 @@ public class FailoverTransport implement
public void stop() throws Exception {
Transport transportToStop = null;
synchronized (reconnectMutex) {
- LOG.debug("Stopped.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopped " + this);
+ }
if (!started) {
return;
}
@@ -825,9 +834,7 @@ public class FailoverTransport implement
doRebalance = false;
}
- if (!useExponentialBackOff || reconnectDelay ==
DEFAULT_INITIAL_RECONNECT_DELAY) {
- reconnectDelay = initialReconnectDelay;
- }
+ resetReconnectDelay();
Transport transport = null;
URI uri = null;
@@ -845,7 +852,9 @@ public class FailoverTransport implement
// for the first time, or we were disposed for some reason.
if (transport == null && !firstConnection &&
(reconnectDelay > 0) && !disposed) {
synchronized (sleepMutex) {
- LOG.debug("Waiting " + reconnectDelay + " ms
before attempting connection. ");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting " + reconnectDelay + " ms
before attempting connection. ");
+ }
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
@@ -868,16 +877,18 @@ public class FailoverTransport implement
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Attempting connect to: " + uri);
+ LOG.debug("Attempting " + connectFailures +
"th connect to: " + uri);
}
transport.setTransportListener(myTransportListener);
transport.start();
- if (started) {
+ if (started && !firstConnection) {
restoreTransport(transport);
}
- LOG.debug("Connection established");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection established");
+ }
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
@@ -899,7 +910,9 @@ public class FailoverTransport implement
if (transportListener != null) {
transportListener.transportResumed();
} else {
- LOG.debug("transport resumed by transport
listener not set");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("transport resumed by transport
listener not set");
+ }
}
if (firstConnection) {
@@ -934,19 +947,10 @@ public class FailoverTransport implement
}
}
- int reconnectAttempts = 0;
- if (firstConnection) {
- if (this.startupMaxReconnectAttempts != 0) {
- reconnectAttempts = this.startupMaxReconnectAttempts;
- }
- }
-
- if (reconnectAttempts == 0) {
- reconnectAttempts = this.maxReconnectAttempts;
- }
+ int reconnectLimit = calculateReconnectAttemptLimit();
- if (reconnectAttempts > 0 && ++connectFailures >=
reconnectAttempts) {
- LOG.error("Failed to connect to transport after: " +
connectFailures + " attempt(s)");
+ if (reconnectLimit != INFINITE && ++connectFailures >=
reconnectLimit) {
+ LOG.error("Failed to connect to " + uris + " after: " +
connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener
has been
@@ -960,14 +964,7 @@ public class FailoverTransport implement
}
}
- if (transportListener != null) {
- if (connectionFailure instanceof IOException) {
- transportListener.onException((IOException)
connectionFailure);
- } else {
-
transportListener.onException(IOExceptionSupport.create(connectionFailure));
- }
- }
- reconnectMutex.notifyAll();
+ propagateFailureToExceptionListener(connectionFailure);
return false;
}
}
@@ -976,7 +973,9 @@ public class FailoverTransport implement
if (reconnectDelay > 0) {
synchronized (sleepMutex) {
- LOG.debug("Waiting " + reconnectDelay + " ms before
attempting connection. ");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting " + reconnectDelay + " ms before
attempting connection");
+ }
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
@@ -997,6 +996,34 @@ public class FailoverTransport implement
return !disposed;
}
+ private void resetReconnectDelay() {
+ if (!useExponentialBackOff || reconnectDelay ==
DEFAULT_INITIAL_RECONNECT_DELAY) {
+ reconnectDelay = initialReconnectDelay;
+ }
+ }
+
+ /*
+ * called with reconnectMutex held
+ */
+ private void propagateFailureToExceptionListener(Exception exception) {
+ if (transportListener != null) {
+ if (exception instanceof IOException) {
+ transportListener.onException((IOException)exception);
+ } else {
+
transportListener.onException(IOExceptionSupport.create(exception));
+ }
+ }
+ reconnectMutex.notifyAll();
+ }
+
+ private int calculateReconnectAttemptLimit() {
+ int maxReconnectValue = this.maxReconnectAttempts;
+ if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
+ maxReconnectValue = this.startupMaxReconnectAttempts;
+ }
+ return maxReconnectValue;
+ }
+
final boolean buildBackups() {
synchronized (backupMutex) {
if (!disposed && backup && backups.size() < backupPoolSize) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Thu Oct 13 20:07:48 2011
@@ -17,12 +17,15 @@
package org.apache.activemq.network;
import java.net.URI;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
@@ -31,6 +34,7 @@ import static org.junit.Assert.assertTru
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
@@ -42,8 +46,10 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,12 +76,16 @@ public class FailoverStaticNetworkTest {
protected BrokerService createBroker(String scheme, String listenPort,
String[] networkToPorts,
HashMap<String, String> networkProps)
throws Exception {
BrokerService broker = new BrokerService();
- //broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort);
- broker.addConnector(scheme + "://localhost:" + listenPort);
+ // lazy init listener on broker start
+ TransportConnector transportConnector = new TransportConnector();
+ transportConnector.setUri(new URI(scheme + "://localhost:" +
listenPort));
+ List<TransportConnector> transportConnectors = new
ArrayList<TransportConnector>();
+ transportConnectors.add(transportConnector);
+ broker.setTransportConnectors(transportConnectors);
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(" +
scheme + "://localhost:");
builder.append(networkToPorts[0]);
@@ -84,7 +94,7 @@ public class FailoverStaticNetworkTest {
}
// limit the reconnects in case of initial random connection to
slave
// leaving randomize on verifies that this config is picked up
- builder.append(")?maxReconnectAttempts=1)");
+
builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
NetworkConnector nc =
broker.addNetworkConnector(builder.toString());
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
@@ -309,11 +319,89 @@ public class FailoverStaticNetworkTest {
doTestNetworkSendReceive();
}
+ @Test
+ public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws
Exception {
+
+ brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"});
+ brokerB.start();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (!done.get()) {
+ brokerA = createBroker("tcp", "61610", null);
+ brokerA.setBrokerName("Pair");
+ brokerA.setBrokerObjectName(new
ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" +
"BrokerName="
+ + JMXSupport.encodeObjectNamePart("A") + "," +
"Type=Broker"));
+
((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+ brokerA.start();
+ brokerA.waitUntilStopped();
+
+ // restart after peer taken over
+ brokerA1.waitUntilStarted();
+ }
+ } catch (Exception ignored) {
+ LOG.info("A create/start, unexpected: " + ignored,
ignored);
+ }
+ }
+ });
+
+ // start with brokerA as master
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerA != null && brokerA.waitUntilStarted();
+ }
+ });
+
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (!done.get()) {
+ brokerA1 = createBroker("tcp", "61611", null);
+ brokerA1.setBrokerName("Pair");
+ // so they can coexist in local jmx we set the object
name b/c the brokername identifies the shared store
+ brokerA1.setBrokerObjectName(new
ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" +
"BrokerName="
+ + JMXSupport.encodeObjectNamePart("A1") + "," +
"Type=Broker"));
+
((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+ brokerA1.start();
+ brokerA1.waitUntilStopped();
+
+ // restart after peer taken over
+ brokerA.waitUntilStarted();
+ }
+ } catch (Exception ignored) {
+ LOG.info("A1 create/start, unexpected: " + ignored,
ignored);
+ }
+ }
+ });
+
+ for (int i=0; i<10; i++) {
+ BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1);
+ LOG.info("iteration: " + i + ", using: " +
currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+ currentMaster.waitUntilStarted();
+
+ doTestNetworkSendReceive(brokerB, currentMaster);
+
+ LOG.info("Stopping " +
currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+ currentMaster.stop();
+ currentMaster.waitUntilStopped();
+ }
+
+ done.set(false);
+ LOG.info("all done");
+ executorService.shutdownNow();
+ }
+
private void doTestNetworkSendReceive() throws Exception, JMSException {
doTestNetworkSendReceive(brokerB, brokerA);
}
- private void doTestNetworkSendReceive(BrokerService to, BrokerService
from) throws Exception, JMSException {
+ private void doTestNetworkSendReceive(final BrokerService to, final
BrokerService from) throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ..." + from);
@@ -332,7 +420,9 @@ public class FailoverStaticNetworkTest {
boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
- return consumer.receive(1000) != null;
+ Message message = consumer.receive(5000);
+ LOG.info("from: " +
from.getBrokerObjectName().getKeyProperty("BrokerName") + ", received: " +
message);
+ return message != null;
}
});
try {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
Thu Oct 13 20:07:48 2011
@@ -54,7 +54,7 @@ public class DiscoveryUriTest extends Em
public void testFailedConnect() throws Exception {
try {
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&maxReconnectAttempts=3&useExponentialBackOff=false");
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&startupMaxReconnectAttempts=3&useExponentialBackOff=false");
Connection conn = factory.createConnection();
conn.start();
} catch (Exception e) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
Thu Oct 13 20:07:48 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.fa
import java.io.IOException;
import java.net.*;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.net.ServerSocketFactory;
@@ -30,17 +31,20 @@ import org.apache.activemq.util.Wait;
public class SlowConnectionTest extends TestCase {
+ private CountDownLatch socketReadyLatch = new CountDownLatch(1);
+
public void testSlowConnection() throws Exception {
+ MockBroker broker = new MockBroker();
+ broker.start();
+
+ socketReadyLatch.await();
int timeout = 1000;
- URI tcpUri = new URI("tcp://localhost:61616?soTimeout=" + timeout +
"&trace=true&connectionTimeout=" + timeout +
"&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
+ URI tcpUri = new URI("tcp://localhost:" + broker.ss.getLocalPort() +
"?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout +
"&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + tcpUri + ")");
final Connection connection = cf.createConnection();
- MockBroker broker = new MockBroker();
- broker.start();
-
new Thread(new Runnable() {
public void run() {
try { connection.start(); } catch (Throwable ignored) {}
@@ -62,19 +66,25 @@ public class SlowConnectionTest extends
}
class MockBroker extends Thread {
+ ServerSocket ss = null;
+ public MockBroker() {
+ super("MockBroker");
+ }
public void run() {
List<Socket> inProgress = new ArrayList<Socket>();
ServerSocketFactory factory = ServerSocketFactory.getDefault();
- ServerSocket ss = null;
try {
- ss = factory.createServerSocket(61616);
+ ss = factory.createServerSocket(0);
+ ss.setSoTimeout(5000);
+ socketReadyLatch.countDown();
while (!interrupted()) {
inProgress.add(ss.accept()); // eat socket
}
+ } catch (java.net.SocketTimeoutException expected) {
} catch (Exception e) {
e.printStackTrace();
} finally {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
Thu Oct 13 20:07:48 2011
@@ -320,7 +320,7 @@ public class DurableSubProcessTest exten
"jms.alwaysSyncSend=true&jms.dispatchAsync=true&" +
"jms.producerWindowSize=20971520&" +
"jms.copyMessageOnSend=false&" +
-
"initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" +
+ "initialReconnectDelay=100&maxReconnectDelay=30000&" +
"useExponentialBackOff=true";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
Thu Oct 13 20:07:48 2011
@@ -398,7 +398,7 @@ public class DurableSubProcessWithRestar
+ "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+ "jms.producerWindowSize=20971520&"
+ "jms.copyMessageOnSend=false&"
- +
"initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&"
+ + "initialReconnectDelay=100&maxReconnectDelay=30000&"
+ "useExponentialBackOff=true";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);