Author: dejanb
Date: Mon Jan 12 06:05:57 2009
New Revision: 733761

URL: http://svn.apache.org/viewvc?rev=733761&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2061

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=733761&r1=733760&r2=733761&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 Mon Jan 12 06:05:57 2009
@@ -82,6 +82,7 @@
     private long initialReconnectDelay = 10;
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
+    private long timeout = -1;
     private boolean useExponentialBackOff = true;
     private boolean randomize = true;
     private boolean initialized;
@@ -318,7 +319,15 @@
         this.maxReconnectAttempts = maxReconnectAttempts;
     }
 
-    /**
+    public long getTimeout() {
+               return timeout;
+       }
+
+       public void setTimeout(long timeout) {
+               this.timeout = timeout;
+       }
+
+       /**
      * @return Returns the randomize.
      */
     public boolean isRandomize() {
@@ -380,7 +389,7 @@
         try {
 
             synchronized (reconnectMutex) {
- 
+               
                 if (isShutdownCommand(command) && connectedTransport.get() == 
null) {
                     if(command.isShutdownInfo()) {
                         // Skipping send of ShutdownInfo command when not 
connected.
@@ -393,19 +402,27 @@
                         myTransportListener.onCommand(response);
                         return;
                     }
-                }                      
+                }
                 // Keep trying until the message is sent.
                 for (int i = 0; !disposed; i++) {
                     try {
 
                         // Wait for transport to be connected.
                         Transport transport = connectedTransport.get();
+                        long start = System.currentTimeMillis();
+                        boolean timedout = false;
                         while (transport == null && !disposed
                                 && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted()) {
                             LOG.trace("Waiting for transport to reconnect.");
+                            long end = System.currentTimeMillis();
+                            if (timeout > 0 && (end - start > timeout)) {
+                               timedout = true;
+                               LOG.info("Failover timed out after " + (end - 
start) + "ms");
+                               break;
+                            }
                             try {
-                                reconnectMutex.wait(1000);
+                                reconnectMutex.wait(100);
                             } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
                                 LOG.debug("Interupted: " + e, e);
@@ -420,7 +437,9 @@
                                 error = new IOException("Transport disposed.");
                             } else if (connectionFailure != null) {
                                 error = connectionFailure;
-                            } else {
+                            } else if (timedout == true) {
+                               error = new IOException("Failover timeout of " 
+ timeout + " ms reached.");
+                            }else {
                                 error = new IOException("Unexpected failure.");
                             }
                             break;
@@ -632,7 +651,6 @@
     }
     
    final boolean doReconnect() {
-
         Exception failure = null;
         synchronized (reconnectMutex) {
 
@@ -724,7 +742,7 @@
                     }
                 }
             }
-
+            
             if (maxReconnectAttempts > 0 && ++connectFailures >= 
maxReconnectAttempts) {
                 LOG.error("Failed to connect to transport after: " + 
connectFailures + " attempt(s)");
                 connectionFailure = failure;

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java?rev=733761&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
 Mon Jan 12 06:05:57 2009
@@ -0,0 +1,58 @@
+package org.apache.activemq.transport.failover;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+
+public class FailoverTimeoutTest extends TestCase {
+       
+       private static final String QUEUE_NAME = "test.failovertimeout";
+
+       public void testTimeout() throws Exception {
+               
+               long timeout = 1000;
+               URI tcpUri = new URI("tcp://localhost:61616");
+               BrokerService bs = new BrokerService();
+               bs.setUseJmx(false);
+               bs.addConnector(tcpUri);
+               bs.start();
+               
+               ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout);
+               Connection connection = cf.createConnection();
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = session.createProducer(session
+                               .createQueue(QUEUE_NAME));
+               TextMessage message = session.createTextMessage("Test message");
+               producer.send(message);
+               
+               bs.stop();
+               
+               try {
+                       producer.send(message);
+               } catch (JMSException jmse) {
+                       jmse.printStackTrace();
+                       assertEquals("Failover timeout of " + timeout + " ms 
reached.", jmse.getMessage());
+               }
+               
+               bs = new BrokerService();
+               
+               bs.setUseJmx(false);
+               bs.addConnector(tcpUri);
+               bs.start();
+               
+               producer.send(message);
+               
+               bs.stop();
+       }
+       
+}


Reply via email to