Author: dejanb
Date: Mon Jan 12 06:05:57 2009
New Revision: 733761
URL: http://svn.apache.org/viewvc?rev=733761&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2061
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=733761&r1=733760&r2=733761&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Mon Jan 12 06:05:57 2009
@@ -82,6 +82,7 @@
private long initialReconnectDelay = 10;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
+ private long timeout = -1;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
private boolean initialized;
@@ -318,7 +319,15 @@
this.maxReconnectAttempts = maxReconnectAttempts;
}
- /**
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
* @return Returns the randomize.
*/
public boolean isRandomize() {
@@ -380,7 +389,7 @@
try {
synchronized (reconnectMutex) {
-
+
if (isShutdownCommand(command) && connectedTransport.get() ==
null) {
if(command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not
connected.
@@ -393,19 +402,27 @@
myTransportListener.onCommand(response);
return;
}
- }
+ }
// Keep trying until the message is sent.
for (int i = 0; !disposed; i++) {
try {
// Wait for transport to be connected.
Transport transport = connectedTransport.get();
+ long start = System.currentTimeMillis();
+ boolean timedout = false;
while (transport == null && !disposed
&& connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
LOG.trace("Waiting for transport to reconnect.");
+ long end = System.currentTimeMillis();
+ if (timeout > 0 && (end - start > timeout)) {
+ timedout = true;
+ LOG.info("Failover timed out after " + (end -
start) + "ms");
+ break;
+ }
try {
- reconnectMutex.wait(1000);
+ reconnectMutex.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Interupted: " + e, e);
@@ -420,7 +437,9 @@
error = new IOException("Transport disposed.");
} else if (connectionFailure != null) {
error = connectionFailure;
- } else {
+ } else if (timedout == true) {
+ error = new IOException("Failover timeout of "
+ timeout + " ms reached.");
+ }else {
error = new IOException("Unexpected failure.");
}
break;
@@ -632,7 +651,6 @@
}
final boolean doReconnect() {
-
Exception failure = null;
synchronized (reconnectMutex) {
@@ -724,7 +742,7 @@
}
}
}
-
+
if (maxReconnectAttempts > 0 && ++connectFailures >=
maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " +
connectFailures + " attempt(s)");
connectionFailure = failure;
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java?rev=733761&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Mon Jan 12 06:05:57 2009
@@ -0,0 +1,58 @@
+package org.apache.activemq.transport.failover;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+
+public class FailoverTimeoutTest extends TestCase {
+
+ private static final String QUEUE_NAME = "test.failovertimeout";
+
+ public void testTimeout() throws Exception {
+
+ long timeout = 1000;
+ URI tcpUri = new URI("tcp://localhost:61616");
+ BrokerService bs = new BrokerService();
+ bs.setUseJmx(false);
+ bs.addConnector(tcpUri);
+ bs.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout);
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session
+ .createQueue(QUEUE_NAME));
+ TextMessage message = session.createTextMessage("Test message");
+ producer.send(message);
+
+ bs.stop();
+
+ try {
+ producer.send(message);
+ } catch (JMSException jmse) {
+ jmse.printStackTrace();
+ assertEquals("Failover timeout of " + timeout + " ms
reached.", jmse.getMessage());
+ }
+
+ bs = new BrokerService();
+
+ bs.setUseJmx(false);
+ bs.addConnector(tcpUri);
+ bs.start();
+
+ producer.send(message);
+
+ bs.stop();
+ }
+
+}