Repository: camel
Updated Branches:
  refs/heads/master e58534afb -> 9bfad3cb8


CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the 
code and fixed some mistakes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9bfad3cb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9bfad3cb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9bfad3cb

Branch: refs/heads/master
Commit: 9bfad3cb84679a4bebb5861833a8bc752c3139e8
Parents: cf288b6
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Feb 21 10:57:33 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 21 11:02:47 2016 +0100

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 90 +++++++++++++++-----
 .../sjms/batch/SjmsBatchConsumerTest.java       | 55 ++++++++++++
 2 files changed, 123 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9bfad3cb/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 5316664..b5d72a2 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -88,6 +88,10 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         if (completionInterval > 0 && completionTimeout != 
SjmsBatchEndpoint.DEFAULT_COMPLETION_TIMEOUT) {
             throw new IllegalArgumentException("Only one of completionInterval 
or completionTimeout can be used, not both.");
         }
+        if (sjmsBatchEndpoint.isSendEmptyMessageWhenIdle() && 
completionTimeout <= 0 && completionInterval <= 0) {
+            throw new IllegalArgumentException("SendEmptyMessageWhenIdle can 
only be enabled if either completionInterval or completionTimeout is also set");
+        }
+
         pollDuration = sjmsBatchEndpoint.getPollDuration();
         if (pollDuration < 0) {
             throw new IllegalArgumentException("pollDuration must be 0 or 
greater");
@@ -205,6 +209,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     }
 
     private class BatchConsumptionLoop implements Runnable {
+
+        private final BatchConsumptionTask task = new 
BatchConsumptionTask(completionTimeoutTrigger);
+
         @Override
         public void run() {
             try {
@@ -217,7 +224,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     MessageConsumer consumer = session.createConsumer(queue);
 
                     try {
-                        consumeBatchesOnLoop(session, consumer, 
completionTimeoutTrigger);
+                        task.consumeBatchesOnLoop(session, consumer);
                     } finally {
                         try {
                             consumer.close();
@@ -250,22 +257,44 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             }
         }
 
-        private void consumeBatchesOnLoop(final Session session, final 
MessageConsumer consumer, final AtomicBoolean timeoutInterval) throws 
JMSException {
-            final boolean usingTimeout = completionTimeout > 0;
+        private final class BatchConsumptionTask {
+
+            // state
+            private final AtomicBoolean timeoutInterval;
+            private final AtomicBoolean timeout = new AtomicBoolean();
+            private int messageCount;
+            private long timeElapsed;
+            private long startTime;
+            private Exchange aggregatedExchange;
+
+            public BatchConsumptionTask(AtomicBoolean timeoutInterval) {
+                this.timeoutInterval = timeoutInterval;
+            }
+
+            private void consumeBatchesOnLoop(final Session session, final 
MessageConsumer consumer) throws JMSException {
+                final boolean usingTimeout = completionTimeout > 0;
+
+                LOG.trace("BatchConsumptionTask +++ start +++");
 
-        batchConsumption:
-            while (running.get()) {
-                // reset the state
-                boolean timeout = false;
-                int messageCount = 0;
-                long timeElapsed = 0;
-                long startTime = 0;
-                Exchange aggregatedExchange = null;
+                while (running.get()) {
 
-            batch:
-                // loop while no timeout or interval triggered and while we 
have room still for messages in the batch
-                while (!timeout && !timeoutInterval.compareAndSet(true, false)
-                        && (usingTimeout || (completionSize > 0 && 
messageCount < completionSize))) {
+                    LOG.trace("BatchConsumptionTask running");
+
+                    if (timeout.compareAndSet(true, false) || 
timeoutInterval.compareAndSet(true, false)) {
+                        // trigger timeout
+                        LOG.trace("Completion batch due timeout");
+                        completionBatch(session);
+                        reset();
+                        continue;
+                    }
+
+                    if (completionSize > 0 && messageCount >= completionSize) {
+                        // trigger completion size
+                        LOG.trace("Completion batch due size");
+                        completionBatch(session);
+                        reset();
+                        continue;
+                    }
 
                     // check periodically to see whether we should be shutting 
down
                     long waitTime = (usingTimeout && (timeElapsed > 0))
@@ -273,16 +302,20 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             : pollDuration;
                     Message message = consumer.receive(waitTime);
 
-                    if (running.get()) { // no interruptions received
+                    if (running.get()) {
+                        // no interruptions received
                         if (message == null) {
                             // timed out, no message received
                             LOG.trace("No message received");
                         } else {
-                            if (usingTimeout && messageCount == 0) { // this 
is the first message
-                                startTime = new Date().getTime(); // start 
counting down the period for this batch
-                            }
                             messageCount++;
                             LOG.debug("#{} messages received", messageCount);
+
+                            if (usingTimeout && startTime == 0) {
+                                // this is the first message start counting 
down the period for this batch
+                                startTime = new Date().getTime();
+                            }
+
                             // TODO: why only object or text messages?
                             if (message instanceof ObjectMessage || message 
instanceof TextMessage) {
                                 final Exchange exchange = 
getEndpoint().createExchange(message, session);
@@ -300,17 +333,29 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
                             if (timeElapsed > completionTimeout) {
                                 // batch finished by timeout
-                                timeout = true;
+                                timeout.set(true);
+                            } else {
+                                LOG.trace("This batch has more time until the 
timeout, elapsed: {} timeout: {}", timeElapsed, completionTimeout);
                             }
                         }
 
                     } else {
-                        LOG.info("Shutdown signal received - rolling batch 
back");
+                        LOG.info("Shutdown signal received - rolling back 
batch");
                         session.rollback();
-                        break batchConsumption;
                     }
                 }
 
+                LOG.trace("BatchConsumptionTask +++ end +++");
+            }
+
+            private void reset() {
+                messageCount = 0;
+                timeElapsed = 0;
+                startTime = 0;
+                aggregatedExchange = null;
+            }
+
+            private void completionBatch(final Session session) {
                 // batch
                 if (aggregatedExchange == null && 
getEndpoint().isSendEmptyMessageWhenIdle()) {
                     processEmptyMessage();
@@ -318,6 +363,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     processBatch(aggregatedExchange, session);
                 }
             }
+
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/camel/blob/9bfad3cb/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 76c739b..80c1970 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -177,6 +177,61 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
         assertFirstMessageBodyOfLength(mockBatches, messageCount);
     }
 
+    @Test
+    public void testConsumptionCompletionInterval() throws Exception {
+        final int completionInterval = 2000;
+        final int completionSize = -1; // timeout-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                
fromF("sjms-batch:%s?completionInterval=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionInterval, 
completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 50;
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMinimumMessageCount(1);  // everything ought to be 
batched together but the interval may trigger in between and we get 2 etc
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+
+        mockBatches.assertIsSatisfied();
+    }
+
+    @Test
+    public void testConsumptionSendEmptyMessageWhenIdle() throws Exception {
+        final int completionInterval = 2000;
+        final int completionSize = -1; // timeout-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                
fromF("sjms-batch:%s?completionInterval=%s&completionSize=%s&sendEmptyMessageWhenIdle=true&aggregationStrategy=#testStrategy",
+                        queueName, completionInterval, 
completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 50;
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        // trigger a couple of empty messages
+        mockBatches.expectedMinimumMessageCount(3);
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+
+        mockBatches.assertIsSatisfied();
+    }
+
     /**
      * Checks whether multiple consumer endpoints can operate in parallel.
      */

Reply via email to