Repository: camel Updated Branches: refs/heads/master e58534afb -> 9bfad3cb8
CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9bfad3cb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9bfad3cb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9bfad3cb Branch: refs/heads/master Commit: 9bfad3cb84679a4bebb5861833a8bc752c3139e8 Parents: cf288b6 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 21 10:57:33 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 21 11:02:47 2016 +0100 ---------------------------------------------------------------------- .../component/sjms/batch/SjmsBatchConsumer.java | 90 +++++++++++++++----- .../sjms/batch/SjmsBatchConsumerTest.java | 55 ++++++++++++ 2 files changed, 123 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9bfad3cb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 5316664..b5d72a2 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -88,6 +88,10 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (completionInterval > 0 && completionTimeout != SjmsBatchEndpoint.DEFAULT_COMPLETION_TIMEOUT) { throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); } + if (sjmsBatchEndpoint.isSendEmptyMessageWhenIdle() && completionTimeout <= 0 && completionInterval <= 0) { + throw new IllegalArgumentException("SendEmptyMessageWhenIdle can only be enabled if either completionInterval or completionTimeout is also set"); + } + pollDuration = sjmsBatchEndpoint.getPollDuration(); if (pollDuration < 0) { throw new IllegalArgumentException("pollDuration must be 0 or greater"); @@ -205,6 +209,9 @@ public class SjmsBatchConsumer extends DefaultConsumer { } private class BatchConsumptionLoop implements Runnable { + + private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger); + @Override public void run() { try { @@ -217,7 +224,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { MessageConsumer consumer = session.createConsumer(queue); try { - consumeBatchesOnLoop(session, consumer, completionTimeoutTrigger); + task.consumeBatchesOnLoop(session, consumer); } finally { try { consumer.close(); @@ -250,22 +257,44 @@ public class SjmsBatchConsumer extends DefaultConsumer { } } - private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer, final AtomicBoolean timeoutInterval) throws JMSException { - final boolean usingTimeout = completionTimeout > 0; + private final class BatchConsumptionTask { + + // state + private final AtomicBoolean timeoutInterval; + private final AtomicBoolean timeout = new AtomicBoolean(); + private int messageCount; + private long timeElapsed; + private long startTime; + private Exchange aggregatedExchange; + + public BatchConsumptionTask(AtomicBoolean timeoutInterval) { + this.timeoutInterval = timeoutInterval; + } + + private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer) throws JMSException { + final boolean usingTimeout = completionTimeout > 0; + + LOG.trace("BatchConsumptionTask +++ start +++"); - batchConsumption: - while (running.get()) { - // reset the state - boolean timeout = false; - int messageCount = 0; - long timeElapsed = 0; - long startTime = 0; - Exchange aggregatedExchange = null; + while (running.get()) { - batch: - // loop while no timeout or interval triggered and while we have room still for messages in the batch - while (!timeout && !timeoutInterval.compareAndSet(true, false) - && (usingTimeout || (completionSize > 0 && messageCount < completionSize))) { + LOG.trace("BatchConsumptionTask running"); + + if (timeout.compareAndSet(true, false) || timeoutInterval.compareAndSet(true, false)) { + // trigger timeout + LOG.trace("Completion batch due timeout"); + completionBatch(session); + reset(); + continue; + } + + if (completionSize > 0 && messageCount >= completionSize) { + // trigger completion size + LOG.trace("Completion batch due size"); + completionBatch(session); + reset(); + continue; + } // check periodically to see whether we should be shutting down long waitTime = (usingTimeout && (timeElapsed > 0)) @@ -273,16 +302,20 @@ public class SjmsBatchConsumer extends DefaultConsumer { : pollDuration; Message message = consumer.receive(waitTime); - if (running.get()) { // no interruptions received + if (running.get()) { + // no interruptions received if (message == null) { // timed out, no message received LOG.trace("No message received"); } else { - if (usingTimeout && messageCount == 0) { // this is the first message - startTime = new Date().getTime(); // start counting down the period for this batch - } messageCount++; LOG.debug("#{} messages received", messageCount); + + if (usingTimeout && startTime == 0) { + // this is the first message start counting down the period for this batch + startTime = new Date().getTime(); + } + // TODO: why only object or text messages? if (message instanceof ObjectMessage || message instanceof TextMessage) { final Exchange exchange = getEndpoint().createExchange(message, session); @@ -300,17 +333,29 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (timeElapsed > completionTimeout) { // batch finished by timeout - timeout = true; + timeout.set(true); + } else { + LOG.trace("This batch has more time until the timeout, elapsed: {} timeout: {}", timeElapsed, completionTimeout); } } } else { - LOG.info("Shutdown signal received - rolling batch back"); + LOG.info("Shutdown signal received - rolling back batch"); session.rollback(); - break batchConsumption; } } + LOG.trace("BatchConsumptionTask +++ end +++"); + } + + private void reset() { + messageCount = 0; + timeElapsed = 0; + startTime = 0; + aggregatedExchange = null; + } + + private void completionBatch(final Session session) { // batch if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) { processEmptyMessage(); @@ -318,6 +363,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { processBatch(aggregatedExchange, session); } } + } /** http://git-wip-us.apache.org/repos/asf/camel/blob/9bfad3cb/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java index 76c739b..80c1970 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java @@ -177,6 +177,61 @@ public class SjmsBatchConsumerTest extends CamelTestSupport { assertFirstMessageBodyOfLength(mockBatches, messageCount); } + @Test + public void testConsumptionCompletionInterval() throws Exception { + final int completionInterval = 2000; + final int completionSize = -1; // timeout-based only + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + fromF("sjms-batch:%s?completionInterval=%s&completionSize=%s&aggregationStrategy=#testStrategy", + queueName, completionInterval, completionSize).routeId("batchConsumer").startupOrder(10) + .to("mock:batches"); + } + }); + context.start(); + + int messageCount = 50; + assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE); + + MockEndpoint mockBatches = getMockEndpoint("mock:batches"); + mockBatches.expectedMinimumMessageCount(1); // everything ought to be batched together but the interval may trigger in between and we get 2 etc + + template.sendBody("direct:in", generateStrings(messageCount)); + + mockBatches.assertIsSatisfied(); + } + + @Test + public void testConsumptionSendEmptyMessageWhenIdle() throws Exception { + final int completionInterval = 2000; + final int completionSize = -1; // timeout-based only + + final String queueName = getQueueName(); + context.addRoutes(new TransactedSendHarness(queueName)); + context.addRoutes(new RouteBuilder() { + public void configure() throws Exception { + fromF("sjms-batch:%s?completionInterval=%s&completionSize=%s&sendEmptyMessageWhenIdle=true&aggregationStrategy=#testStrategy", + queueName, completionInterval, completionSize).routeId("batchConsumer").startupOrder(10) + .to("mock:batches"); + } + }); + context.start(); + + int messageCount = 50; + assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE); + + MockEndpoint mockBatches = getMockEndpoint("mock:batches"); + // trigger a couple of empty messages + mockBatches.expectedMinimumMessageCount(3); + + template.sendBody("direct:in", generateStrings(messageCount)); + + mockBatches.assertIsSatisfied(); + } + /** * Checks whether multiple consumer endpoints can operate in parallel. */