Fix how delays are handled in the Failover Provider, initial reconnect delay and fixed reconnect delay can be configured and if back off is on the max will kick in
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d2a8da99 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d2a8da99 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d2a8da99 Branch: refs/heads/master Commit: d2a8da9907cc5fce93381788b10653e417010769 Parents: fd090ed Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Feb 3 19:23:13 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Feb 3 19:23:13 2015 -0500 ---------------------------------------------------------------------- .../jms/provider/failover/FailoverProvider.java | 22 ++++++- .../qpid/jms/failover/JmsFailoverTest.java | 60 ++++++++++++-------- .../jms/failover/JmsOfflineBehaviorTests.java | 14 ++--- .../jms/failover/JmsTxConsumerFailoverTest.java | 13 +++-- .../jms/failover/JmsTxProducerFailoverTest.java | 13 +++-- 5 files changed, 76 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 28f6838..c4ae820 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -62,6 +62,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private static final Logger LOG = LoggerFactory.getLogger(FailoverProvider.class); + private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 0; + private static final int DEFAULT_RECONNECT_DELAY = 10; private static final int UNLIMITED = -1; private ProviderListener listener; @@ -80,7 +82,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide // Current state of connection / reconnection private boolean firstConnection = true; private long reconnectAttempts; - private long reconnectDelay = TimeUnit.SECONDS.toMillis(5); private IOException failureCause; private URI connectedURI; @@ -91,7 +92,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; // Configuration values. - private long initialReconnectDelay = 0L; + private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; + private long reconnectDelay = DEFAULT_RECONNECT_DELAY; private long maxReconnectDelay = TimeUnit.SECONDS.toMillis(30); private boolean useExponentialBackOff = true; private double backOffMultiplier = 2d; @@ -534,6 +536,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide return; } + if (initialReconnectDelay > 0 && reconnectAttempts == 0) { + LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay); + connectionHub.schedule(this, initialReconnectDelay, TimeUnit.MILLISECONDS); + } + reconnectAttempts++; Throwable failure = null; URI target = uris.getNext(); @@ -693,6 +700,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide this.initialReconnectDelay = initialReconnectDealy; } + public long getReconnectDelay() { + return initialReconnectDelay; + } + + public void setReconnectDelay(long reconnectDealy) { + this.reconnectDelay = reconnectDealy; + } + public long getMaxReconnectDelay() { return maxReconnectDelay; } @@ -875,6 +890,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide * that if the connection is successfully established that the connection established event * is triggered once before moving on to sending only connection interrupted and restored * events. + * + * The connection state events must all be triggered from the FailoverProvider's serialization + * thread, this class ensures that the connection established event follows that pattern. */ protected abstract class CreateConnectionRequest extends FailoverRequest { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java index 777eac5..3885106 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.failover; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -62,7 +63,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testFailoverConnectsWithMultipleURIs() throws Exception { URI brokerURI = new URI("failover://(amqp://127.0.0.1:61616,amqp://localhost:5777," + - getBrokerAmqpConnectionURI() + ")?failover.maxReconnectDelay=500"); + getBrokerAmqpConnectionURI() + ")"); Connection connection = createAmqpConnection(brokerURI); connection.start(); connection.close(); @@ -70,8 +71,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testStartupReconnectAttempts() throws Exception { - URI brokerURI = new URI("failover://(amqp://localhost:61616)" + - "?failover.maxReconnectDelay=50&failover.startupMaxReconnectAttempts=5"); + URI brokerURI = new URI("failover://(amqp://localhost:61616)?failover.startupMaxReconnectAttempts=5"); JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); Connection connection = factory.createConnection(); try { @@ -88,7 +88,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testStartupReconnectAttemptsMultipleHosts() throws Exception { URI brokerURI = new URI("failover://(amqp://localhost:61616,amqp://localhost:61617)" + - "?failover.maxReconnectDelay=100&failover.startupMaxReconnectAttempts=5"); + "?failover.startupMaxReconnectAttempts=6"); JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); Connection connection = factory.createConnection(); try { @@ -105,7 +105,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testStartFailureWithAsyncExceptionListener() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI() + - "?failover.maxReconnectDelay=100&failover.maxReconnectAttempts=5"); + "?failover.reconnectDelay=20&failover.maxReconnectAttempts=5"); final CountDownLatch failed = new CountDownLatch(1); JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); @@ -125,18 +125,17 @@ public class JmsFailoverTest extends AmqpTestSupport { assertTrue("No async exception", failed.await(15, TimeUnit.SECONDS)); } - @SuppressWarnings("unused") @Test(timeout=60000) public void testBasicStateRestoration() throws Exception { - URI brokerURI = new URI(getAmqpFailoverURI() + "?failover.maxReconnectDelay=1000"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); + session.createProducer(queue); + session.createConsumer(queue); assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); assertEquals(1, brokerService.getAdminView().getQueueProducers().length); @@ -149,16 +148,28 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); - assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); - assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + assertTrue("Should one new Queue Subscription.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getQueueSubscribers().length == 1; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); + + assertTrue("Should one new Queue Producer.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getQueueProducers().length == 1; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); } - @SuppressWarnings("unused") @Test(timeout=60000) public void testDurableSubscriberRestores() throws Exception { - URI brokerURI = new URI(getAmqpFailoverURI() + "?failover.maxReconnectDelay=200"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.setClientID(name.getMethodName()); @@ -167,6 +178,7 @@ public class JmsFailoverTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(name.getMethodName()); MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName()); + assertNotNull(consumer); assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); @@ -178,7 +190,7 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have no inactive subscribers.", Wait.waitFor(new Wait.Condition() { @@ -186,7 +198,7 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have one durable sub.", Wait.waitFor(new Wait.Condition() { @@ -194,13 +206,13 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); } @Test(timeout=90000) public void testBadFirstURIConnectsAndProducerWorks() throws Exception { URI brokerURI = new URI("failover://(amqp://localhost:61616," + - getBrokerAmqpConnectionURI() + ")?failover.maxReconnectDelay=100"); + getBrokerAmqpConnectionURI() + ")?failover.reconnectDelay=50"); connection = createAmqpConnection(brokerURI); connection.start(); @@ -226,14 +238,14 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return proxy.getQueueSize() == MSG_COUNT; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertFalse(failed.getCount() == 0); } @Test(timeout=90000) public void testNonTxProducerRecoversAfterFailover() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=1000"); + URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.reconnectDelay=50"); connection = createAmqpConnection(brokerURI); connection.start(); @@ -269,7 +281,7 @@ public class JmsFailoverTest extends AmqpTestSupport { // Wait until a couple messages get sent on first broker run. assertTrue(sentSome.await(3, TimeUnit.SECONDS)); stopPrimaryBroker(); - TimeUnit.SECONDS.sleep(3); // Gives FailoverProvider some CPU time + TimeUnit.SECONDS.sleep(2); // Gives FailoverProvider some CPU time restartPrimaryBroker(); assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { @@ -278,7 +290,7 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have a recovered producer.", Wait.waitFor(new Wait.Condition() { @@ -286,7 +298,7 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getQueueProducers().length == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); @@ -296,7 +308,7 @@ public class JmsFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return proxy.getQueueSize() == MSG_COUNT; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); assertFalse(failed.getCount() == 0); connection.close(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java index 96e9c5a..1250540 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java @@ -111,7 +111,6 @@ public class JmsOfflineBehaviorTests extends AmqpTestSupport { connection.close(); } - @SuppressWarnings("unused") @Test(timeout=60000) public void testSessionCloseWithOpenResourcesDoesNotBlock() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI()); @@ -120,8 +119,8 @@ public class JmsOfflineBehaviorTests extends AmqpTestSupport { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); - MessageConsumer consumer = session.createConsumer(queue); - MessageProducer producer = session.createProducer(queue); + session.createConsumer(queue); + session.createProducer(queue); stopPrimaryBroker(); session.close(); @@ -157,12 +156,11 @@ public class JmsOfflineBehaviorTests extends AmqpTestSupport { return false; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); connection.close(); } - @SuppressWarnings("unused") @Test(timeout=60000) public void testClosedReourcesAreNotRestored() throws Exception { URI brokerURI = new URI(getAmqpFailoverURI() + "?failover.maxReconnectDelay=500"); @@ -171,8 +169,8 @@ public class JmsOfflineBehaviorTests extends AmqpTestSupport { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); - MessageConsumer consumer = session.createConsumer(queue); - MessageProducer producer = session.createProducer(queue); + session.createConsumer(queue); + session.createProducer(queue); assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); assertEquals(1, brokerService.getAdminView().getQueueProducers().length); @@ -188,7 +186,7 @@ public class JmsOfflineBehaviorTests extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); assertEquals(0, brokerService.getAdminView().getQueueProducers().length); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java index 027ca92..c698adf 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxConsumerFailoverTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.net.URI; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.Message; @@ -51,7 +52,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { */ @Test(timeout=60000) public void testTxConsumerReceiveAfterFailoverCommits() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -74,7 +75,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have a recovered consumer.", Wait.waitFor(new Wait.Condition() { @@ -82,7 +83,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getQueueSubscribers().length == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); for (int i = 0; i < MSG_COUNT; ++i) { Message received = consumer.receive(1000); @@ -101,7 +102,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testTxConsumerReceiveThenFailoverCommitFails() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -138,7 +139,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testTxConsumerRollbackAfterFailoverGetsNoErrors() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -182,7 +183,7 @@ public class JmsTxConsumerFailoverTest extends AmqpTestSupport { */ @Test(timeout=60000) public void testTxConsumerReceiveWorksAfterFailoverButCommitFails() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d2a8da99/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java index 89fed7b..32f3c54 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.net.URI; +import java.util.concurrent.TimeUnit; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -50,7 +51,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { */ @Test(timeout=60000) public void testTxProducerSendAfterFailoverCommits() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -73,7 +74,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getCurrentConnectionsCount() == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); assertTrue("Should have a recovered producer.", Wait.waitFor(new Wait.Condition() { @@ -81,7 +82,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { public boolean isSatisified() throws Exception { return brokerService.getAdminView().getQueueProducers().length == 1; } - })); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); for (int i = 0; i < MSG_COUNT; ++i) { LOG.debug("Producer sening message #{}", i + 1); @@ -107,7 +108,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { */ @Test(timeout=60000) public void testTxProducerSendsThenFailoverCommitFails() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -146,7 +147,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { @Test(timeout=60000) public void testTxProducerRollbackAfterFailoverGetsNoErrors() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); @@ -189,7 +190,7 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport { */ @Test(timeout=60000) public void testTxProducerSendWorksButCommitFails() throws Exception { - URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?failover.maxReconnectDelay=100"); + URI brokerURI = new URI(getAmqpFailoverURI()); connection = createAmqpConnection(brokerURI); connection.start(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org