QPIDJMS-404 Add variants of ProviderFuture for perf tuning Adds three variations on ProviderFuture that allow for tuning on platforms that don't benefit from the spin / wait pattern used in the current implementation and default to using a variant that does not park on windows to avoid unpredictably long parks that result in performance decreases due to missing the event completion.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/264a9a9b Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/264a9a9b Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/264a9a9b Branch: refs/heads/master Commit: 264a9a9b6c5d8d8c11a995b7b02289b2938d77ba Parents: 8b20067 Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Jul 18 12:11:53 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Jul 18 12:37:35 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 34 ++-- .../qpid/jms/JmsLocalTransactionContext.java | 2 +- .../org/apache/qpid/jms/JmsMessageConsumer.java | 4 +- .../org/apache/qpid/jms/JmsMessageProducer.java | 2 +- .../java/org/apache/qpid/jms/JmsSession.java | 2 +- .../jms/provider/BalancedProviderFuture.java | 149 ++++++++++++++++ .../provider/ConservativeProviderFuture.java | 128 ++++++++++++++ .../jms/provider/ProgressiveProviderFuture.java | 167 ++++++++++++++++++ .../org/apache/qpid/jms/provider/Provider.java | 22 +++ .../qpid/jms/provider/ProviderFactory.java | 34 +++- .../qpid/jms/provider/ProviderFuture.java | 165 ++--------------- .../jms/provider/ProviderFutureFactory.java | 176 +++++++++++++++++++ .../qpid/jms/provider/ProviderWrapper.java | 10 ++ .../qpid/jms/provider/amqp/AmqpProvider.java | 30 ++-- .../jms/provider/amqp/AmqpProviderFactory.java | 32 +++- .../jms/provider/failover/FailoverProvider.java | 28 ++- .../failover/FailoverProviderFactory.java | 24 ++- .../jms/provider/ProviderFutureFactoryTest.java | 105 +++++++++++ .../qpid/jms/provider/ProviderFutureTest.java | 44 +++-- .../jms/provider/WrappedAsyncResultTest.java | 9 +- .../jms/provider/amqp/AmqpProviderTest.java | 6 +- .../failover/FailoverProviderClosedTest.java | 28 +-- .../provider/failover/FailoverProviderTest.java | 31 ++-- .../qpid/jms/provider/mock/MockProvider.java | 18 +- .../jms/provider/mock/MockProviderFactory.java | 28 ++- .../discovery/DiscoveryProviderFactory.java | 25 ++- 26 files changed, 1061 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 7d09a9c..bb456fe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -214,7 +214,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } if (isConnected() && !isFailed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); requests.put(request, request); try { provider.destroy(connectionInfo, request); @@ -684,7 +684,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.create(resource, request); @@ -705,7 +705,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.start(resource, request); @@ -726,7 +726,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.stop(resource, request); @@ -747,7 +747,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.destroy(resource, request); @@ -764,7 +764,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.send(envelope, request); @@ -785,7 +785,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); provider.acknowledge(envelope, ackType, request); request.sync(); } catch (Exception ioe) { @@ -801,7 +801,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); provider.acknowledge(sessionId, ackType, request); request.sync(); } catch (Exception ioe) { @@ -817,7 +817,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.unsubscribe(name, request); @@ -838,7 +838,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.commit(transactionInfo, nextTransactionId, request); @@ -859,7 +859,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.rollback(transactionInfo, nextTransactionId, request); @@ -880,7 +880,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.recover(sessionId, request); @@ -901,7 +901,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection checkClosedOrFailed(); try { - ProviderFuture request = new ProviderFuture(synchronization); + ProviderFuture request = provider.newProviderFuture(synchronization); requests.put(request, request); try { provider.pull(consumerId, timeout, request); @@ -1256,12 +1256,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection public void onConnectionRecovery(Provider provider) throws Exception { LOG.debug("Connection {} is starting recovery.", connectionInfo.getId()); - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); provider.create(connectionInfo, request); request.sync(); for (JmsTemporaryDestination tempDestination : tempDestinations.values()) { - request = new ProviderFuture(); + request = provider.newProviderFuture(); provider.create(tempDestination, request); request.sync(); } @@ -1269,7 +1269,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); if (!consumerInfo.isClosed()) { - request = new ProviderFuture(); + request = provider.newProviderFuture(); provider.create(consumerInfo, request); request.sync(); } @@ -1290,7 +1290,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) { JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); if (!consumerInfo.isClosed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); provider.start(consumerInfo, request); request.sync(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index c60c5e6..45bd10a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -350,7 +350,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { // current transaction we must mark it as in-doubt so that a commit attempt // will then roll it back. transactionInfo = getNextTransactionInfo(); - ProviderFuture request = new ProviderFuture(new ProviderSynchronization() { + ProviderFuture request = provider.newProviderFuture(new ProviderSynchronization() { @Override public void onPendingSuccess() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index e922679..d444e77 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -651,7 +651,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe protected void onConnectionRecovery(Provider provider) throws Exception { if (!consumerInfo.isClosed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); try { provider.create(consumerInfo, request); request.sync(); @@ -667,7 +667,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe protected void onConnectionRecovered(Provider provider) throws Exception { if (!consumerInfo.isClosed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); provider.start(consumerInfo, request); request.sync(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index 39f1888..cab1490 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -357,7 +357,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { protected void onConnectionRecovery(Provider provider) throws Exception { if (!producerInfo.isClosed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); try { provider.create(producerInfo, request); request.sync(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 9711728..e3dd65f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -1337,7 +1337,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe protected void onConnectionRecovery(Provider provider) throws Exception { if (!sessionInfo.isClosed()) { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); provider.create(sessionInfo, request); request.sync(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java new file mode 100644 index 0000000..846a796 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.jms.util.IOExceptionSupport; + +/** + * A more balanced implementation of a ProviderFuture that works better on some + * platforms such as windows where the thread park and atomic operations used by + * a more aggressive implementation could result in poor performance. + */ +public class BalancedProviderFuture extends ProviderFuture { + + // Using a progressive wait strategy helps to avoid wait happening before + // completion and avoid using expensive thread signalling + private static final int SPIN_COUNT = 10; + private static final int YIELD_COUNT = 100; + + public BalancedProviderFuture() { + this(null); + } + + public BalancedProviderFuture(ProviderSynchronization synchronization) { + super(synchronization); + } + + @Override + public boolean sync(long amount, TimeUnit unit) throws IOException { + try { + if (isComplete() || amount == 0) { + failOnError(); + return true; + } + + final long timeout = unit.toNanos(amount); + long maxParkNanos = timeout / 8; + maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout; + final long startTime = System.nanoTime(); + int idleCount = 0; + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + final long elapsed = System.nanoTime() - startTime; + final long diff = elapsed - timeout; + + if (diff >= 0) { + failOnError(); + return isComplete(); + } + + if (isComplete()) { + failOnError(); + return true; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return true; + } + + waiting++; + try { + wait(-diff / 1000000, (int) (-diff % 1000000)); + } finally { + waiting--; + } + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } + + @Override + public void sync() throws IOException { + try { + if (isComplete()) { + failOnError(); + return; + } + + int idleCount = 0; + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + if (isComplete()) { + failOnError(); + return; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return; + } + + waiting++; + try { + wait(); + } finally { + waiting--; + } + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java new file mode 100644 index 0000000..e055130 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.jms.util.IOExceptionSupport; + +/** + * A more conservative implementation of a ProviderFuture that is better on some + * platforms or resource constrained hardware where high CPU usage can be more + * counter productive than other variants that might spin or otherwise avoid + * entry into states requiring thread signalling. + */ +public class ConservativeProviderFuture extends ProviderFuture { + + public ConservativeProviderFuture() { + this(null); + } + + public ConservativeProviderFuture(ProviderSynchronization synchronization) { + super(synchronization); + } + + @Override + public boolean sync(long amount, TimeUnit unit) throws IOException { + try { + if (isComplete() || amount == 0) { + failOnError(); + return true; + } + + final long timeout = unit.toNanos(amount); + long maxParkNanos = timeout / 8; + maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout; + final long startTime = System.nanoTime(); + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + final long elapsed = System.nanoTime() - startTime; + final long diff = elapsed - timeout; + + if (diff >= 0) { + failOnError(); + return isComplete(); + } + + if (isComplete()) { + failOnError(); + return true; + } + + synchronized (this) { + if (isComplete()) { + failOnError(); + return true; + } + + waiting++; + try { + wait(-diff / 1000000, (int) (-diff % 1000000)); + } finally { + waiting--; + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } + + @Override + public void sync() throws IOException { + try { + if (isComplete()) { + failOnError(); + return; + } + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + if (isComplete()) { + failOnError(); + return; + } + + synchronized (this) { + if (isComplete()) { + failOnError(); + return; + } + + waiting++; + try { + wait(); + } finally { + waiting--; + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java new file mode 100644 index 0000000..bd4da7f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import org.apache.qpid.jms.util.IOExceptionSupport; + +/** + * An optimized version of a ProviderFuture that makes use of spin waits and other + * methods of reacting to asynchronous completion in a more timely manner. + */ +public class ProgressiveProviderFuture extends ProviderFuture { + + // Using a progressive wait strategy helps to avoid wait happening before + // completion and avoid using expensive thread signaling + private static final int SPIN_COUNT = 10; + private static final int YIELD_COUNT = 100; + private static final int TINY_PARK_COUNT = 1000; + private static final int TINY_PARK_NANOS = 1; + private static final int SMALL_PARK_COUNT = 101_000; + private static final int SMALL_PARK_NANOS = 10_000; + + public ProgressiveProviderFuture() { + this(null); + } + + public ProgressiveProviderFuture(ProviderSynchronization synchronization) { + super(synchronization); + } + + @Override + public boolean sync(long amount, TimeUnit unit) throws IOException { + try { + if (isComplete() || amount == 0) { + failOnError(); + return true; + } + + final long timeout = unit.toNanos(amount); + long maxParkNanos = timeout / 8; + maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout; + final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS); + final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS); + final long startTime = System.nanoTime(); + int idleCount = 0; + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + final long elapsed = System.nanoTime() - startTime; + final long diff = elapsed - timeout; + + if (diff >= 0) { + failOnError(); + return isComplete(); + } + + if (isComplete()) { + failOnError(); + return true; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else if (idleCount < TINY_PARK_COUNT) { + LockSupport.parkNanos(tinyParkNanos); + idleCount++; + } else if (idleCount < SMALL_PARK_COUNT) { + LockSupport.parkNanos(smallParkNanos); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return true; + } + + waiting++; + try { + wait(-diff / 1000000, (int) (-diff % 1000000)); + } finally { + waiting--; + } + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } + + @Override + public void sync() throws IOException { + try { + if (isComplete()) { + failOnError(); + return; + } + + int idleCount = 0; + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + while (true) { + if (isComplete()) { + failOnError(); + return; + } + + if (idleCount < SPIN_COUNT) { + idleCount++; + } else if (idleCount < YIELD_COUNT) { + Thread.yield(); + idleCount++; + } else if (idleCount < TINY_PARK_COUNT) { + LockSupport.parkNanos(TINY_PARK_NANOS); + idleCount++; + } else if (idleCount < SMALL_PARK_COUNT) { + LockSupport.parkNanos(SMALL_PARK_NANOS); + idleCount++; + } else { + synchronized (this) { + if (isComplete()) { + failOnError(); + return; + } + + waiting++; + try { + wait(); + } finally { + waiting--; + } + } + } + } + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java index a61cb34..85a626d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java @@ -340,6 +340,27 @@ public interface Provider { JmsMessageFactory getMessageFactory(); /** + * Gets a ProviderFuture instance from the Provider for use in performing Provider calls + * that require an asynchronous completion to know when the call to the provider has succeeded + * or failed. + * + * @return a ProviderFuture for use in calling Provider methods that require a completion object. + */ + ProviderFuture newProviderFuture(); + + /** + * Gets a ProviderFuture instance from the Provider for use in performing Provider calls + * that require an asynchronous completion to know when the call to the provider has succeeded + * or failed. + * + * @param synchronization + * A {@link ProviderSynchronization} to assign to the resulting {@link ProviderFuture}. + * + * @return a ProviderFuture for use in calling Provider methods that require a completion object. + */ + ProviderFuture newProviderFuture(ProviderSynchronization synchronization); + + /** * Sets the listener of events from this Provider instance. * * @param listener @@ -353,4 +374,5 @@ public interface Provider { * @return the currently set ProviderListener instance. */ ProviderListener getProviderListener(); + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java index 19a41db..6341e75 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java @@ -48,6 +48,21 @@ public abstract class ProviderFactory { public abstract Provider createProvider(URI remoteURI) throws Exception; /** + * Creates an instance of the given AsyncProvider and configures it using the + * properties set on the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Broker. + * @param futureFactory + * The {@link ProviderFutureFactory} to use when creating the new {@link Provider}. + * + * @return a new AsyncProvider instance. + * + * @throws Exception if an error occurs while creating the Provider instance. + */ + public abstract Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception; + + /** * @return the name of this Provider. */ public abstract String getName(); @@ -64,11 +79,28 @@ public abstract class ProviderFactory { * @throws Exception if an error occurs while creating the AsyncProvider instance. */ public static Provider create(URI remoteURI) throws Exception { + return create(remoteURI, null); + } + + /** + * Static create method that performs the ProviderFactory search and handles the + * configuration and setup. + * + * @param remoteURI + * the URI of the remote peer. + * @param futureFactory + * the {@link ProviderFutureFactory} to use when building the new {@link Provider}. + * + * @return a new AsyncProvider instance that is ready for use. + * + * @throws Exception if an error occurs while creating the AsyncProvider instance. + */ + public static Provider create(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception { Provider result = null; try { ProviderFactory factory = findProviderFactory(remoteURI); - result = factory.createProvider(remoteURI); + result = factory.createProvider(remoteURI, futureFactory); } catch (Exception ex) { LOG.error("Failed to create Provider instance for {}, due to: {}", remoteURI.getScheme(), ex); LOG.trace("Error: ", ex); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java index e62db63..75805a6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java @@ -19,39 +19,28 @@ package org.apache.qpid.jms.provider; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.LockSupport; import org.apache.qpid.jms.util.IOExceptionSupport; /** * Asynchronous Provider Future class. */ -public class ProviderFuture implements AsyncResult { +public abstract class ProviderFuture implements AsyncResult { - // Using a progressive wait strategy helps to avoid await happening before countDown - // and avoids expensive thread signaling - private static final int SPIN_COUNT = 10; - private static final int YIELD_COUNT = 100; - private static final int TINY_PARK_COUNT = 1000; - private static final int TINY_PARK_NANOS = 1; - private static final int SMALL_PARK_COUNT = 101_000; - private static final int SMALL_PARK_NANOS = 10_000; + protected final ProviderSynchronization synchronization; // States used to track progress of this future - private static final int INCOMPLETE = 0; - private static final int COMPLETING = 1; - private static final int SUCCESS = 2; - private static final int FAILURE = 3; + protected static final int INCOMPLETE = 0; + protected static final int COMPLETING = 1; + protected static final int SUCCESS = 2; + protected static final int FAILURE = 3; - private static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state"); + protected static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state"); private volatile int state = INCOMPLETE; - private Throwable error; - - private int waiting; - - private final ProviderSynchronization synchronization; + protected Throwable error; + protected int waiting; public ProviderFuture() { this(null); @@ -102,6 +91,13 @@ public class ProviderFuture implements AsyncResult { } /** + * Waits for a response to some Provider requested operation. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public abstract void sync() throws IOException; + + /** * Timed wait for a response to a Provider operation. * * @param amount @@ -114,132 +110,9 @@ public class ProviderFuture implements AsyncResult { * * @throws IOException if an error occurs while waiting for the response. */ - public boolean sync(long amount, TimeUnit unit) throws IOException { - try { - if (isComplete() || amount == 0) { - failOnError(); - return true; - } - - final Thread currentThread = Thread.currentThread(); - final long timeout = unit.toNanos(amount); - long maxParkNanos = timeout / 8; - maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout; - final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS); - final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS); - final long startTime = System.nanoTime(); - int idleCount = 0; - - while (true) { - if (currentThread.isInterrupted()) { - throw new InterruptedException(); - } - - final long elapsed = System.nanoTime() - startTime; - final long diff = elapsed - timeout; - - if (diff >= 0) { - failOnError(); - return isComplete(); - } - - if (isComplete()) { - failOnError(); - return true; - } - - if (idleCount < SPIN_COUNT) { - idleCount++; - } else if (idleCount < YIELD_COUNT) { - Thread.yield(); - idleCount++; - } else if (idleCount < TINY_PARK_COUNT) { - LockSupport.parkNanos(tinyParkNanos); - idleCount++; - } else if (idleCount < SMALL_PARK_COUNT) { - LockSupport.parkNanos(smallParkNanos); - idleCount++; - } else { - synchronized (this) { - if (isComplete()) { - failOnError(); - return true; - } - - waiting++; - try { - wait(-diff / 1000000, (int) (-diff % 1000000)); - } finally { - waiting--; - } - } - } - } - } catch (InterruptedException e) { - Thread.interrupted(); - throw IOExceptionSupport.create(e); - } - } - - /** - * Waits for a response to some Provider requested operation. - * - * @throws IOException if an error occurs while waiting for the response. - */ - public void sync() throws IOException { - try { - if (isComplete()) { - failOnError(); - return; - } - - final Thread currentThread = Thread.currentThread(); - int idleCount = 0; - - while (true) { - if (currentThread.isInterrupted()) { - throw new InterruptedException(); - } - - if (isComplete()) { - failOnError(); - return; - } - - if (idleCount < SPIN_COUNT) { - idleCount++; - } else if (idleCount < YIELD_COUNT) { - Thread.yield(); - idleCount++; - } else if (idleCount < TINY_PARK_COUNT) { - LockSupport.parkNanos(TINY_PARK_NANOS); - idleCount++; - } else if (idleCount < SMALL_PARK_COUNT) { - LockSupport.parkNanos(SMALL_PARK_NANOS); - idleCount++; - } else { - synchronized (this) { - if (isComplete()) { - failOnError(); - return; - } - - waiting++; - try { - wait(); - } finally { - waiting--; - } - } - } - } - } catch (InterruptedException e) { - Thread.interrupted(); - throw IOExceptionSupport.create(e); - } - } + public abstract boolean sync(long amount, TimeUnit unit) throws IOException; - private void failOnError() throws IOException { + protected void failOnError() throws IOException { Throwable cause = error; if (cause != null) { throw IOExceptionSupport.create(cause); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java new file mode 100644 index 0000000..95868c0 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.util.Map; + +/** + * Factory for provider future instances that will create specific versions based on + * configuration. + */ +public abstract class ProviderFutureFactory { + + public static final String PROVIDER_FUTURE_TYPE_KEY = "futureType"; + + private static final String OS_NAME = System.getProperty("os.name"); + private static final String WINDOWS_OS_PREFIX = "Windows"; + private static final boolean IS_WINDOWS = isOsNameMatch(OS_NAME, WINDOWS_OS_PREFIX); + + private static final String CONSERVATIVE = "conservative"; + private static final String BALANCED = "balanced"; + private static final String PROGRESSIVE = "progressive"; + + /** + * Create a new Provider + * + * @param providerOptions + * Configuration options to be consumed by this factory create method + * + * @return a new ProviderFutureFactory that will be used to create the desired future types. + */ + public static ProviderFutureFactory create(Map<String, String> providerOptions) { + String futureTypeKey = providerOptions.remove(PROVIDER_FUTURE_TYPE_KEY); + + if (futureTypeKey == null || futureTypeKey.isEmpty()) { + if (Runtime.getRuntime().availableProcessors() < 4) { + return new ConservativeProviderFutureFactory(); + } else if (isWindows()) { + return new BalancedProviderFutureFactory(); + } else { + return new ProgressiveProviderFutureFactory(); + } + } + + switch (futureTypeKey.toLowerCase()) { + case CONSERVATIVE: + return new ConservativeProviderFutureFactory(); + case BALANCED: + return new BalancedProviderFutureFactory(); + case PROGRESSIVE: + return new ProgressiveProviderFutureFactory(); + default: + throw new IllegalArgumentException( + "No ProviderFuture implementation with name " + futureTypeKey + " found"); + } + } + + /** + * @return a new ProviderFuture instance. + */ + public abstract ProviderFuture createFuture(); + + /** + * @param synchronization + * The {@link ProviderSynchronization} to assign to the returned {@link ProviderFuture}. + * + * @return a new ProviderFuture instance. + */ + public abstract ProviderFuture createFuture(ProviderSynchronization synchronization); + + /** + * @return a ProviderFuture that treats failures as success calls that simply complete the operation. + */ + public abstract ProviderFuture createUnfailableFuture(); + + //----- Internal support methods -----------------------------------------// + + private static boolean isWindows() { + return IS_WINDOWS; + } + + private static boolean isOsNameMatch(final String currentOSName, final String osNamePrefix) { + if (currentOSName == null || currentOSName.isEmpty()) { + return false; + } + + return currentOSName.startsWith(osNamePrefix); + } + + //----- ProviderFutureFactory implementation -----------------------------// + + private static class ConservativeProviderFutureFactory extends ProviderFutureFactory { + + @Override + public ProviderFuture createFuture() { + return new ConservativeProviderFuture(); + } + + @Override + public ProviderFuture createFuture(ProviderSynchronization synchronization) { + return new ConservativeProviderFuture(synchronization); + } + + @Override + public ProviderFuture createUnfailableFuture() { + return new ConservativeProviderFuture() { + + @Override + public void onFailure(Throwable t) { + this.onSuccess(); + } + }; + } + } + + private static class BalancedProviderFutureFactory extends ProviderFutureFactory { + + @Override + public ProviderFuture createFuture() { + return new BalancedProviderFuture(); + } + + @Override + public ProviderFuture createFuture(ProviderSynchronization synchronization) { + return new BalancedProviderFuture(synchronization); + } + + @Override + public ProviderFuture createUnfailableFuture() { + return new BalancedProviderFuture() { + + @Override + public void onFailure(Throwable t) { + this.onSuccess(); + } + }; + } + } + + private static class ProgressiveProviderFutureFactory extends ProviderFutureFactory { + + @Override + public ProviderFuture createFuture() { + return new ProgressiveProviderFuture(); + } + + @Override + public ProviderFuture createFuture(ProviderSynchronization synchronization) { + return new ProgressiveProviderFuture(synchronization); + } + + @Override + public ProviderFuture createUnfailableFuture() { + return new ProgressiveProviderFuture() { + + @Override + public void onFailure(Throwable t) { + this.onSuccess(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java index 6dfa4c6..2a4eb09 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -146,6 +146,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi } @Override + public ProviderFuture newProviderFuture() { + return next.newProviderFuture(); + } + + @Override + public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) { + return next.newProviderFuture(synchronization); + } + + @Override public void setProviderListener(ProviderListener listener) { this.listener = listener; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index a4dbc39..1c97dfe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -60,7 +60,9 @@ import org.apache.qpid.jms.provider.ProviderClosedException; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFailedException; import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderFutureFactory; import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderSynchronization; import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder; import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder; import org.apache.qpid.jms.sasl.Mechanism; @@ -145,6 +147,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private final Collector protonCollector = new CollectorImpl(); private final Connection protonConnection = Connection.Factory.create(); + private final ProviderFutureFactory futureFactory; private AsyncResult connectionRequest; private ScheduledFuture<?> nextIdleTimeoutCheck; @@ -155,10 +158,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP * The URI of the AMQP broker this Provider instance will connect to. * @param transport * The underlying Transport that will be used for wire level communications. + * @param futureFactory + * The ProviderFutureFactory to use when futures are requested. */ - public AmqpProvider(URI remoteURI, Transport transport) { + public AmqpProvider(URI remoteURI, Transport transport, ProviderFutureFactory futureFactory) { this.remoteURI = remoteURI; this.transport = transport; + this.futureFactory = futureFactory; serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory( "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" + @@ -172,7 +178,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP public void connect(final JmsConnectionInfo connectionInfo) throws IOException { checkClosedOrFailed(); - final ProviderFuture connectRequest = new ProviderFuture(); + final ProviderFuture connectRequest = futureFactory.createFuture(); serializer.execute(new Runnable() { @@ -302,15 +308,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP @Override public void close() { if (closed.compareAndSet(false, true)) { - final ProviderFuture request = new ProviderFuture() { - - @Override - public void onFailure(Throwable result) { - // During close it is fine if the close call fails - // this in unrecoverable so we just log the event. - onSuccess(); - } - }; + final ProviderFuture request = futureFactory.createUnfailableFuture(); serializer.execute(new Runnable() { @@ -1138,6 +1136,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return connection.getAmqpMessageFactory(); } + @Override + public ProviderFuture newProviderFuture() { + return futureFactory.createFuture(); + } + + @Override + public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) { + return futureFactory.createFuture(synchronization); + } + public void setTraceFrames(boolean trace) { this.traceFrames = trace; updateTracer(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java index c141e26..a0083cf 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Map; import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.provider.ProviderFutureFactory; import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportFactory; import org.apache.qpid.jms.util.PropertyUtil; @@ -37,19 +38,40 @@ public class AmqpProviderFactory extends ProviderFactory { @Override public AmqpProvider createProvider(URI remoteURI) throws Exception { + return createProvider(remoteURI, null); + } + @Override + public AmqpProvider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception { Map<String, String> map = PropertyUtil.parseQuery(remoteURI); - Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "amqp."); - // Clear off any amqp.X values from the transport before creation. + // Clear off any amqp.X and provider.X values from the transport before creation. + Map<String, String> amqpProviderOptions = PropertyUtil.filterProperties(map, "amqp."); + Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider."); + Transport transport = TransportFactory.create(getTransportScheme(), PropertyUtil.replaceQuery(remoteURI, map)); - AmqpProvider result = new AmqpProvider(remoteURI, transport); + // If we have been given a futures factory to use then we ignore any URI options indicating + // what to create and just go with what we are given. + if (futureFactory == null) { + // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider + futureFactory = ProviderFutureFactory.create(providerOptions); + if (!providerOptions.isEmpty()) { + String msg = "" + + " Not all Provider options could be applied during AMQP Provider creation." + + " Check the options are spelled correctly." + + " Unused parameters=[" + providerOptions + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + } + + AmqpProvider result = new AmqpProvider(remoteURI, transport, futureFactory); - Map<String, String> unused = PropertyUtil.setProperties(result, providerOptions); + Map<String, String> unused = PropertyUtil.setProperties(result, amqpProviderOptions); if (!unused.isEmpty()) { String msg = "" - + " Not all provider options could be set on the AMQP Provider." + + " Not all AMQP provider options could be set on the AMQP Provider." + " Check the options are spelled correctly." + " Unused parameters=[" + unused + "]." + " This provider instance cannot be started."; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index b48f4c9..654ec48 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -52,8 +52,10 @@ import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFactory; import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderFutureFactory; import org.apache.qpid.jms.provider.ProviderListener; import org.apache.qpid.jms.provider.ProviderRedirectedException; +import org.apache.qpid.jms.provider.ProviderSynchronization; import org.apache.qpid.jms.provider.WrappedAsyncResult; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.jms.util.QpidJMSThreadFactory; @@ -100,6 +102,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>(); private final DefaultProviderListener closedListener = new DefaultProviderListener(); private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>(); + private final ProviderFutureFactory futureFactory; // Current state of connection / reconnection private final ReconnectControls reconnectControl = new ReconnectControls(); @@ -124,16 +127,17 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private FailoverServerListAction amqpOpenServerListAction = FailoverServerListAction.REPLACE; - public FailoverProvider(Map<String, String> nestedOptions) { - this(null, nestedOptions); + public FailoverProvider(Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) { + this(null, nestedOptions, futureFactory); } - public FailoverProvider(List<URI> uris) { - this(uris, null); + public FailoverProvider(List<URI> uris, ProviderFutureFactory futureFactory) { + this(uris, null, futureFactory); } - public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions) { + public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) { this.uris = new FailoverUriPool(uris, nestedOptions); + this.futureFactory = futureFactory; serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: serialization thread", true)); serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -167,7 +171,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide @Override public void close() { if (closed.compareAndSet(false, true)) { - final ProviderFuture request = new ProviderFuture(); + final ProviderFuture request = futureFactory.createFuture(); serializer.execute(new Runnable() { @Override @@ -700,7 +704,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide try { LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target.getScheme() + "://" + target.getHost() + ":" + target.getPort()); - provider = ProviderFactory.create(target); + provider = ProviderFactory.create(target, futureFactory); provider.connect(connectionInfo); initializeNewConnection(provider); return; @@ -1045,6 +1049,16 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } @Override + public ProviderFuture newProviderFuture() { + return futureFactory.createFuture(); + } + + @Override + public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) { + return futureFactory.createFuture(synchronization); + } + + @Override public String toString() { return "FailoverProvider: " + (connectedURI == null ? "unconnected" : connectedURI.toString()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java index 3914784..5d33763 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.provider.ProviderFutureFactory; import org.apache.qpid.jms.util.PropertyUtil; import org.apache.qpid.jms.util.URISupport; import org.apache.qpid.jms.util.URISupport.CompositeData; @@ -42,13 +43,34 @@ public class FailoverProviderFactory extends ProviderFactory { @Override public Provider createProvider(URI remoteURI) throws Exception { + return createProvider(remoteURI, null); + } + + @Override + public Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception { CompositeData composite = URISupport.parseComposite(remoteURI); Map<String, String> options = composite.getParameters(); Map<String, String> filtered = PropertyUtil.filterProperties(options, FAILOVER_OPTION_PREFIX); Map<String, String> nested = PropertyUtil.filterProperties(filtered, FAILOVER_NESTED_OPTION_PREFIX_ADDON); - FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested); + Map<String, String> providerOptions = PropertyUtil.filterProperties(options, "provider."); + // If we have been given a futures factory to use then we ignore any URI options indicating + // what to create and just go with what we are given. + if (futureFactory == null) { + // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider + futureFactory = ProviderFutureFactory.create(providerOptions); + if (!providerOptions.isEmpty()) { + String msg = "" + + " Not all Provider options could be applied during Failover Provider creation." + + " Check the options are spelled correctly." + + " Unused parameters=[" + providerOptions + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + } + + FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested, futureFactory); Map<String, String> unused = PropertyUtil.setProperties(provider, filtered); if (!unused.isEmpty()) { String msg = "" http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java new file mode 100644 index 0000000..8cd03ce --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class ProviderFutureFactoryTest { + + @Test + public void testCreateFailsWithNullOptions() { + try { + ProviderFutureFactory.create(null); + fail("Should throw NullPointerException"); + } catch (NullPointerException npe) {} + } + + @Test + public void testCreateFailsWhenFutureTypeNotValid() { + Map<String, String> options = new HashMap<>(); + + options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "super-fast"); + + try { + ProviderFutureFactory.create(options); + fail("Should throw IllegalArgumentException"); + } catch (IllegalArgumentException iae) {} + } + + @Test + public void testCreateFactoryWithNoConfigurationOptionsGiven() { + ProviderFutureFactory factory = ProviderFutureFactory.create(Collections.emptyMap()); + + ProviderFuture future = factory.createFuture(); + assertNotNull(future); + assertFalse(future.isComplete()); + } + + @Test + public void testCreateConservativeFactoryFromConfiguration() { + Map<String, String> options = new HashMap<>(); + + options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "conservative"); + + ProviderFutureFactory factory = ProviderFutureFactory.create(options); + + ProviderFuture future = factory.createFuture(); + assertNotNull(future); + assertFalse(future.isComplete()); + + assertTrue(future instanceof ConservativeProviderFuture); + } + + @Test + public void testCreateBalancedFactoryFromConfiguration() { + Map<String, String> options = new HashMap<>(); + + options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "balanced"); + + ProviderFutureFactory factory = ProviderFutureFactory.create(options); + + ProviderFuture future = factory.createFuture(); + assertNotNull(future); + assertFalse(future.isComplete()); + + assertTrue(future instanceof BalancedProviderFuture); + } + + @Test + public void testCreateProgressiveFactoryFromConfiguration() { + Map<String, String> options = new HashMap<>(); + + options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "progressive"); + + ProviderFutureFactory factory = ProviderFutureFactory.create(options); + + ProviderFuture future = factory.createFuture(); + assertNotNull(future); + assertFalse(future.isComplete()); + + assertTrue(future instanceof ProgressiveProviderFuture); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java index 42258ed..601c879 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java @@ -22,17 +22,41 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class ProviderFutureTest { + private final ProviderFutureFactory futuresFactory; + + @Parameters(name = "{index}: futureType={0}") + public static Collection<Object> data() { + return Arrays.asList(new Object[] { + "conservative", "balanced", "progressive" } + ); + } + + public ProviderFutureTest(String futureTypeName) { + Map<String, String> options = new HashMap<>(); + options.put("futureType", futureTypeName); + + futuresFactory = ProviderFutureFactory.create(options); + } + @Test public void testIsComplete() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); assertFalse(future.isComplete()); future.onSuccess(); @@ -41,7 +65,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testOnSuccess() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); future.onSuccess(); try { @@ -53,7 +77,7 @@ public class ProviderFutureTest { @Test(timeout = 90000) public void testTimedSync() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); try { assertFalse(future.sync(1, TimeUnit.SECONDS)); @@ -64,7 +88,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testOnFailure() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); IOException ex = new IOException(); future.onFailure(ex); @@ -79,7 +103,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testOnSuccessCallsSynchronization() { final AtomicBoolean syncCalled = new AtomicBoolean(false); - ProviderFuture future = new ProviderFuture(new ProviderSynchronization() { + ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() { @Override public void onPendingSuccess() { @@ -104,7 +128,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testOnFailureCallsSynchronization() { final AtomicBoolean syncCalled = new AtomicBoolean(false); - ProviderFuture future = new ProviderFuture(new ProviderSynchronization() { + ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() { @Override public void onPendingSuccess() { @@ -131,7 +155,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testSuccessfulStateIsFixed() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); IOException ex = new IOException(); future.onSuccess(); @@ -145,7 +169,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testFailedStateIsFixed() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); IOException ex = new IOException(); future.onFailure(ex); @@ -160,7 +184,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testSyncHandlesInterruption() throws InterruptedException { - final ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); final CountDownLatch syncing = new CountDownLatch(1); final CountDownLatch done = new CountDownLatch(1); @@ -193,7 +217,7 @@ public class ProviderFutureTest { @Test(timeout = 10000) public void testTimedSyncHandlesInterruption() throws InterruptedException { - final ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); final CountDownLatch syncing = new CountDownLatch(1); final CountDownLatch done = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java index 52e888b..6fea518 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java @@ -21,11 +21,14 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collections; import org.junit.Test; public class WrappedAsyncResultTest { + private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap()); + @Test public void testCreateWithNull() { try { @@ -35,7 +38,7 @@ public class WrappedAsyncResultTest { @Test public void testGetWrapped() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {}; assertSame(wrapped.getWrappedRequest(), future); @@ -43,7 +46,7 @@ public class WrappedAsyncResultTest { @Test public void testOnSuccessPassthrough() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {}; assertFalse(future.isComplete()); @@ -55,7 +58,7 @@ public class WrappedAsyncResultTest { @Test public void testOnFailurePassthrough() { - ProviderFuture future = new ProviderFuture(); + ProviderFuture future = futuresFactory.createFuture(); WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {}; assertFalse(future.isComplete()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java index b26cb5f..53f3c6a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java @@ -323,7 +323,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { fail("Should have thrown an IOException when closed."); } catch (IOException ex) {} - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); try { provider.unsubscribe("subscription-name", request); fail("Should have thrown an IOException when closed."); @@ -360,7 +360,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { connectionInfo.setSendTimeout(SEND_TIMEOUT); connectionInfo.setRequestTimeout(REQUEST_TIMEOUT); - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); provider.create(connectionInfo, request); request.sync(); @@ -415,7 +415,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { }; assertFalse("Error should not yet be thrown", errorThrown.get()); - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = provider.newProviderFuture(); switch(operation) { case CREATE: http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java index 7cf686c..c5d4197 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover; import java.io.IOException; import java.net.URI; +import java.util.Collections; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; @@ -28,6 +29,7 @@ import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.meta.JmsTransactionInfo; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderFutureFactory; import org.junit.Before; import org.junit.Test; @@ -36,6 +38,8 @@ import org.junit.Test; */ public class FailoverProviderClosedTest extends FailoverProviderTestSupport { + private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap()); + private FailoverProvider provider; private JmsConnectionInfo connection; private JmsSessionInfo session; @@ -71,49 +75,49 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport { @Test(timeout=30000, expected=IOException.class) public void testCreateResource() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.create(connection, request); } @Test(timeout=30000, expected=IOException.class) public void testStartResource() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.start(session, request); } @Test(timeout=30000, expected=IOException.class) public void testStopResource() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.stop(session, request); } @Test(timeout=30000, expected=IOException.class) public void testDestroyResource() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.destroy(session, request); } @Test(timeout=30000, expected=IOException.class) public void testSend() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.send(new JmsOutboundMessageDispatch(), request); } @Test(timeout=30000, expected=IOException.class) public void testSessionAcknowledge() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.acknowledge(session.getId(), ACK_TYPE.ACCEPTED, request); } @Test(timeout=30000, expected=IOException.class) public void testAcknowledgeMessage() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.acknowledge(new JmsInboundMessageDispatch(1), ACK_TYPE.ACCEPTED, request); } @Test(timeout=30000, expected=IOException.class) public void testCommit() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1); JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId); provider.commit(txInfo, null, request); @@ -121,7 +125,7 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport { @Test(timeout=30000, expected=IOException.class) public void testRollback() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1); JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId); provider.rollback(txInfo, null, request); @@ -129,19 +133,19 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport { @Test(timeout=30000, expected=IOException.class) public void testRecover() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.recover(session.getId(), request); } @Test(timeout=30000, expected=IOException.class) public void testUnsubscribe() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.unsubscribe("subscription-name", request); } @Test(timeout=30000, expected=IOException.class) public void testMessagePull() throws Exception { - ProviderFuture request = new ProviderFuture(); + ProviderFuture request = futuresFactory.createFuture(); provider.pull(consumer.getId(), 1, request); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org