Fix how delays are handled in the Failover Provider, initial reconnect
delay and fixed reconnect delay can be configured and if back off is on
the max will kick in

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d2a8da99
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d2a8da99
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d2a8da99

Branch: refs/heads/master
Commit: d2a8da9907cc5fce93381788b10653e417010769
Parents: fd090ed
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Feb 3 19:23:13 2015 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Feb 3 19:23:13 2015 -0500

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java | 22 ++++++-
 .../qpid/jms/failover/JmsFailoverTest.java      | 60 ++++++++++++--------
 .../jms/failover/JmsOfflineBehaviorTests.java   | 14 ++---
 .../jms/failover/JmsTxConsumerFailoverTest.java | 13 +++--
 .../jms/failover/JmsTxProducerFailoverTest.java | 13 +++--
 5 files changed, 76 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 28f6838..c4ae820 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -62,6 +62,8 @@ public class FailoverProvider extends DefaultProviderListener 
implements Provide
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FailoverProvider.class);
 
+    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
+    private static final int DEFAULT_RECONNECT_DELAY = 10;
     private static final int UNLIMITED = -1;
 
     private ProviderListener listener;
@@ -80,7 +82,6 @@ public class FailoverProvider extends DefaultProviderListener 
implements Provide
     // Current state of connection / reconnection
     private boolean firstConnection = true;
     private long reconnectAttempts;
-    private long reconnectDelay = TimeUnit.SECONDS.toMillis(5);
     private IOException failureCause;
     private URI connectedURI;
 
@@ -91,7 +92,8 @@ public class FailoverProvider extends DefaultProviderListener 
implements Provide
     private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
 
     // Configuration values.
-    private long initialReconnectDelay = 0L;
+    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+    private long reconnectDelay = DEFAULT_RECONNECT_DELAY;
     private long maxReconnectDelay = TimeUnit.SECONDS.toMillis(30);
     private boolean useExponentialBackOff = true;
     private double backOffMultiplier = 2d;
@@ -534,6 +536,11 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                     return;
                 }
 
+                if (initialReconnectDelay > 0 && reconnectAttempts == 0) {
+                    LOG.trace("Delayed initial reconnect attempt will be in {} 
milliseconds", initialReconnectDelay);
+                    connectionHub.schedule(this, initialReconnectDelay, 
TimeUnit.MILLISECONDS);
+                }
+
                 reconnectAttempts++;
                 Throwable failure = null;
                 URI target = uris.getNext();
@@ -693,6 +700,14 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
         this.initialReconnectDelay = initialReconnectDealy;
     }
 
+    public long getReconnectDelay() {
+        return initialReconnectDelay;
+    }
+
+    public void setReconnectDelay(long reconnectDealy) {
+        this.reconnectDelay = reconnectDealy;
+    }
+
     public long getMaxReconnectDelay() {
         return maxReconnectDelay;
     }
@@ -875,6 +890,9 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
      * that if the connection is successfully established that the connection 
established event
      * is triggered once before moving on to sending only connection 
interrupted and restored
      * events.
+     *
+     * The connection state events must all be triggered from the 
FailoverProvider's serialization
+     * thread, this class ensures that the connection established event 
follows that pattern.
      */
     protected abstract class CreateConnectionRequest extends FailoverRequest {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
index 777eac5..3885106 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.failover;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -62,7 +63,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
     @Test(timeout=60000)
     public void testFailoverConnectsWithMultipleURIs() throws Exception {
         URI brokerURI = new 
URI("failover://(amqp://127.0.0.1:61616,amqp://localhost:5777," +
-                                getBrokerAmqpConnectionURI() + 
")?failover.maxReconnectDelay=500");
+                                getBrokerAmqpConnectionURI() + ")");
         Connection connection = createAmqpConnection(brokerURI);
         connection.start();
         connection.close();
@@ -70,8 +71,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
 
     @Test(timeout=60000)
     public void testStartupReconnectAttempts() throws Exception {
-        URI brokerURI = new URI("failover://(amqp://localhost:61616)" +
-                                
"?failover.maxReconnectDelay=50&failover.startupMaxReconnectAttempts=5");
+        URI brokerURI = new 
URI("failover://(amqp://localhost:61616)?failover.startupMaxReconnectAttempts=5");
         JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
         Connection connection = factory.createConnection();
         try {
@@ -88,7 +88,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
     @Test(timeout=60000)
     public void testStartupReconnectAttemptsMultipleHosts() throws Exception {
         URI brokerURI = new 
URI("failover://(amqp://localhost:61616,amqp://localhost:61617)" +
-                                
"?failover.maxReconnectDelay=100&failover.startupMaxReconnectAttempts=5");
+                                "?failover.startupMaxReconnectAttempts=6");
         JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
         Connection connection = factory.createConnection();
         try {
@@ -105,7 +105,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
     @Test(timeout=60000)
     public void testStartFailureWithAsyncExceptionListener() throws Exception {
         URI brokerURI = new URI(getAmqpFailoverURI() +
-            "?failover.maxReconnectDelay=100&failover.maxReconnectAttempts=5");
+            "?failover.reconnectDelay=20&failover.maxReconnectAttempts=5");
 
         final CountDownLatch failed = new CountDownLatch(1);
         JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
@@ -125,18 +125,17 @@ public class JmsFailoverTest extends AmqpTestSupport {
         assertTrue("No async exception", failed.await(15, TimeUnit.SECONDS));
     }
 
-    @SuppressWarnings("unused")
     @Test(timeout=60000)
     public void testBasicStateRestoration() throws Exception {
-        URI brokerURI = new URI(getAmqpFailoverURI() + 
"?failover.maxReconnectDelay=1000");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createConsumer(queue);
+        session.createProducer(queue);
+        session.createConsumer(queue);
 
         assertEquals(1, 
brokerService.getAdminView().getQueueSubscribers().length);
         assertEquals(1, 
brokerService.getAdminView().getQueueProducers().length);
@@ -149,16 +148,28 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
-        assertEquals(1, 
brokerService.getAdminView().getQueueSubscribers().length);
-        assertEquals(1, 
brokerService.getAdminView().getQueueProducers().length);
+        assertTrue("Should one new Queue Subscription.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
brokerService.getAdminView().getQueueSubscribers().length == 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        assertTrue("Should one new Queue Producer.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getQueueProducers().length 
== 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
     }
 
-    @SuppressWarnings("unused")
     @Test(timeout=60000)
     public void testDurableSubscriberRestores() throws Exception {
-        URI brokerURI = new URI(getAmqpFailoverURI() + 
"?failover.maxReconnectDelay=200");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.setClientID(name.getMethodName());
@@ -167,6 +178,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic(name.getMethodName());
         MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName());
+        assertNotNull(consumer);
 
         assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
 
@@ -178,7 +190,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertTrue("Should have no inactive subscribers.", Wait.waitFor(new 
Wait.Condition() {
 
@@ -186,7 +198,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertTrue("Should have one durable sub.", Wait.waitFor(new 
Wait.Condition() {
 
@@ -194,13 +206,13 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getDurableTopicSubscribers().length == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
     }
 
     @Test(timeout=90000)
     public void testBadFirstURIConnectsAndProducerWorks() throws Exception {
         URI brokerURI = new URI("failover://(amqp://localhost:61616," +
-            getBrokerAmqpConnectionURI() + ")?failover.maxReconnectDelay=100");
+            getBrokerAmqpConnectionURI() + ")?failover.reconnectDelay=50");
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -226,14 +238,14 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return proxy.getQueueSize() == MSG_COUNT;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertFalse(failed.getCount() == 0);
     }
 
     @Test(timeout=90000)
     public void testNonTxProducerRecoversAfterFailover() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=1000");
+        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.reconnectDelay=50");
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -269,7 +281,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
         // Wait until a couple messages get sent on first broker run.
         assertTrue(sentSome.await(3, TimeUnit.SECONDS));
         stopPrimaryBroker();
-        TimeUnit.SECONDS.sleep(3);  // Gives FailoverProvider some CPU time
+        TimeUnit.SECONDS.sleep(2);  // Gives FailoverProvider some CPU time
         restartPrimaryBroker();
 
         assertTrue("Should have a new connection.", Wait.waitFor(new 
Wait.Condition() {
@@ -278,7 +290,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertTrue("Should have a recovered producer.", Wait.waitFor(new 
Wait.Condition() {
 
@@ -286,7 +298,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return brokerService.getAdminView().getQueueProducers().length 
== 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
 
         final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
 
@@ -296,7 +308,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return proxy.getQueueSize() == MSG_COUNT;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
 
         assertFalse(failed.getCount() == 0);
         connection.close();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
index 96e9c5a..1250540 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java
@@ -111,7 +111,6 @@ public class JmsOfflineBehaviorTests extends 
AmqpTestSupport {
         connection.close();
     }
 
-    @SuppressWarnings("unused")
     @Test(timeout=60000)
     public void testSessionCloseWithOpenResourcesDoesNotBlock() throws 
Exception {
         URI brokerURI = new URI(getAmqpFailoverURI());
@@ -120,8 +119,8 @@ public class JmsOfflineBehaviorTests extends 
AmqpTestSupport {
 
         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
+        session.createConsumer(queue);
+        session.createProducer(queue);
 
         stopPrimaryBroker();
         session.close();
@@ -157,12 +156,11 @@ public class JmsOfflineBehaviorTests extends 
AmqpTestSupport {
 
                 return false;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         connection.close();
     }
 
-    @SuppressWarnings("unused")
     @Test(timeout=60000)
     public void testClosedReourcesAreNotRestored() throws Exception {
         URI brokerURI = new URI(getAmqpFailoverURI() + 
"?failover.maxReconnectDelay=500");
@@ -171,8 +169,8 @@ public class JmsOfflineBehaviorTests extends 
AmqpTestSupport {
 
         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
+        session.createConsumer(queue);
+        session.createProducer(queue);
 
         assertEquals(1, 
brokerService.getAdminView().getQueueSubscribers().length);
         assertEquals(1, 
brokerService.getAdminView().getQueueProducers().length);
@@ -188,7 +186,7 @@ public class JmsOfflineBehaviorTests extends 
AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertEquals(0, 
brokerService.getAdminView().getQueueSubscribers().length);
         assertEquals(0, 
brokerService.getAdminView().getQueueProducers().length);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
index 027ca92..c698adf 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -51,7 +52,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
      */
     @Test(timeout=60000)
     public void testTxConsumerReceiveAfterFailoverCommits() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -74,7 +75,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertTrue("Should have a recovered consumer.", Wait.waitFor(new 
Wait.Condition() {
 
@@ -82,7 +83,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getQueueSubscribers().length == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
 
         for (int i = 0; i < MSG_COUNT; ++i) {
             Message received = consumer.receive(1000);
@@ -101,7 +102,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
 
     @Test(timeout=60000)
     public void testTxConsumerReceiveThenFailoverCommitFails() throws 
Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -138,7 +139,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
 
     @Test(timeout=60000)
     public void testTxConsumerRollbackAfterFailoverGetsNoErrors() throws 
Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -182,7 +183,7 @@ public class JmsTxConsumerFailoverTest extends 
AmqpTestSupport {
      */
     @Test(timeout=60000)
     public void testTxConsumerReceiveWorksAfterFailoverButCommitFails() throws 
Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
index 89fed7b..32f3c54 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -50,7 +51,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
      */
     @Test(timeout=60000)
     public void testTxProducerSendAfterFailoverCommits() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -73,7 +74,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return 
brokerService.getAdminView().getCurrentConnectionsCount() == 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.MILLISECONDS.toMillis(100)));
 
         assertTrue("Should have a recovered producer.", Wait.waitFor(new 
Wait.Condition() {
 
@@ -81,7 +82,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
             public boolean isSatisified() throws Exception {
                 return brokerService.getAdminView().getQueueProducers().length 
== 1;
             }
-        }));
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
 
         for (int i = 0; i < MSG_COUNT; ++i) {
             LOG.debug("Producer sening message #{}", i + 1);
@@ -107,7 +108,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
      */
     @Test(timeout=60000)
     public void testTxProducerSendsThenFailoverCommitFails() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -146,7 +147,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
 
     @Test(timeout=60000)
     public void testTxProducerRollbackAfterFailoverGetsNoErrors() throws 
Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
@@ -189,7 +190,7 @@ public class JmsTxProducerFailoverTest extends 
AmqpTestSupport {
      */
     @Test(timeout=60000)
     public void testTxProducerSendWorksButCommitFails() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() 
+")?failover.maxReconnectDelay=100");
+        URI brokerURI = new URI(getAmqpFailoverURI());
 
         connection = createAmqpConnection(brokerURI);
         connection.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to