Repository: cxf Updated Branches: refs/heads/master 811e10d4d -> 2c6862050
CXF-5680 Avoid rebuilding suspendedContinuations after reconnect Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2c686205 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2c686205 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2c686205 Branch: refs/heads/master Commit: 2c6862050159b00d6866214a4d37743d62c3c362 Parents: 811e10d Author: Christian Schneider <[email protected]> Authored: Wed Apr 9 13:25:58 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Wed Apr 9 13:25:58 2014 +0200 ---------------------------------------------------------------------- .../org/apache/cxf/transport/jms/JMSDestination.java | 10 ++++++---- .../apache/cxf/transport/jms/ThrottlingCounter.java | 14 +++++++++----- .../cxf/transport/jms/ThrottlingCounterTest.java | 3 ++- 3 files changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/2c686205/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 7e5ac35..e9dc970 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -73,6 +73,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess this.jmsConfig = jmsConfig; info.setProperty(OneWayProcessorInterceptor.USE_ORIGINAL_THREAD, Boolean.TRUE); loader = bus.getExtension(ClassLoader.class); + int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100; + + this.suspendedContinuations = new ThrottlingCounter(restartLimit, + jmsConfig.getMaxSuspendedContinuations()); } /** @@ -90,9 +94,6 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess getLogger().log(Level.FINE, "JMSDestination activate().... "); jmsConfig.ensureProperlyConfigured(); jmsListener = createTargetDestinationListener(); - int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100; - this.suspendedContinuations = new ThrottlingCounter(this.jmsListener, restartLimit, - jmsConfig.getMaxSuspendedContinuations()); } @@ -115,6 +116,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess Executor executor = JMSFactory.createExecutor(bus, "jms-destination"); container.setExecutor(executor); container.start(); + suspendedContinuations.setListenerContainer(container); return container; } catch (JMSException e) { throw JMSUtil.convertJmsException(e); @@ -153,7 +155,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess if (jmsListener != null) { jmsListener.shutdown(); } - this.suspendedContinuations = null; + suspendedContinuations.setListenerContainer(null); connection = null; } http://git-wip-us.apache.org/repos/asf/cxf/blob/2c686205/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java index afe8552..7ddc552 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java @@ -34,18 +34,22 @@ public class ThrottlingCounter implements Counter { private AtomicInteger counter; private final int lowWatermark; private final int highWatermark; - private final JMSListenerContainer listenerContainer; + private JMSListenerContainer listenerContainer; - public ThrottlingCounter(JMSListenerContainer listenerContainer, int lowWatermark, int highWatermark) { + public ThrottlingCounter(int lowWatermark, int highWatermark) { this.counter = new AtomicInteger(); this.lowWatermark = lowWatermark; this.highWatermark = highWatermark; - this.listenerContainer = listenerContainer; } + public void setListenerContainer(JMSListenerContainer listenerContainer) { + this.listenerContainer = listenerContainer; + } + public final int incrementAndGet() { int curCounter = counter.incrementAndGet(); - if (highWatermark >= 0 && curCounter >= highWatermark && listenerContainer.isRunning()) { + if (listenerContainer != null && highWatermark >= 0 + && curCounter >= highWatermark && listenerContainer.isRunning()) { listenerContainer.stop(); } return curCounter; @@ -53,7 +57,7 @@ public class ThrottlingCounter implements Counter { public final int decrementAndGet() { int curCounter = counter.decrementAndGet(); - if (curCounter <= lowWatermark && !listenerContainer.isRunning()) { + if (listenerContainer != null && curCounter <= lowWatermark && !listenerContainer.isRunning()) { listenerContainer.start(); } return curCounter; http://git-wip-us.apache.org/repos/asf/cxf/blob/2c686205/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java index c222818..3813e7d 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java @@ -33,7 +33,8 @@ public class ThrottlingCounterTest { public void testThrottleWithJmsStartAndStop() { JMSListenerContainer listenerContainer = new DummyJMSListener(); - ThrottlingCounter counter = new ThrottlingCounter(listenerContainer, 0, 1); + ThrottlingCounter counter = new ThrottlingCounter(0, 1); + counter.setListenerContainer(listenerContainer); assertTrue(listenerContainer.isRunning()); counter.incrementAndGet();
