Repository: camel
Updated Branches:
  refs/heads/master 76a10b773 -> c3b236dbe


CAMEL-9974: Add completionPreidicate to camel-sjms batch component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3b236db
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3b236db
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3b236db

Branch: refs/heads/master
Commit: c3b236dbe0376285a2af6d691f3a1f9f5ba116fe
Parents: 76a10b7
Author: Claus Ibsen <[email protected]>
Authored: Sat May 21 15:35:02 2016 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Sat May 21 15:35:02 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  2 +-
 .../camel-sjms/src/main/docs/sjms-batch.adoc    |  6 +++-
 .../component/sjms/batch/SjmsBatchConsumer.java | 25 +++++++++++++
 .../component/sjms/batch/SjmsBatchEndpoint.java | 38 ++++++++++++++++++++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 27 ++++++++++++++
 5 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index b0e8bd7..e14118e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -661,7 +661,7 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
 
     /**
      * Use eager completion checking which means that the 
{{completionPredicate}} will use the incoming Exchange.
-     * At opposed to without eager completion checking the 
{{completionPredicate}} will use the aggregated Exchange.
+     * As opposed to without eager completion checking the 
{{completionPredicate}} will use the aggregated Exchange.
      *
      * @return builder
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/docs/sjms-batch.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch.adoc 
b/components/camel-sjms/src/main/docs/sjms-batch.adoc
index 3bfaccd..715ae59 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch.adoc
@@ -134,8 +134,9 @@ The Simple JMS Batch component supports 1 options which are 
listed below.
 
 
 
+
 // endpoint options: START
-The Simple JMS Batch component supports 19 endpoint options which are listed 
below:
+The Simple JMS Batch component supports 21 endpoint options which are listed 
below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -146,9 +147,11 @@ The Simple JMS Batch component supports 19 endpoint 
options which are listed bel
 | allowNullBody | consumer | true | boolean | Whether to allow sending 
messages with no body. If this option is false and the message body is null 
then an JMSException is thrown.
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN/ERROR level and ignored.
 | completionInterval | consumer | 1000 | int | The completion interval in 
millis which causes batches to be completed in a scheduled fixed rate every 
interval. The batch may be empty if the timeout triggered and there was no 
messages in the batch. Notice you cannot use both completion timeout and 
completion interval at the same time only one can be configured.
+| completionPredicate | consumer |  | String | The completion predicate which 
causes batches to be completed when the predicate evaluates as true. The 
predicate can be configured using the simple language using the string syntax.
 | completionSize | consumer | 200 | int | The number of messages consumed at 
which the batch will be completed
 | completionTimeout | consumer | 500 | int | The timeout in millis from 
receipt of the first first message when the batch will be completed. The batch 
may be empty if the timeout triggered and there was no messages in the batch. 
Notice you cannot use both completion timeout and completion interval at the 
same time only one can be configured.
 | consumerCount | consumer | 1 | int | The number of JMS sessions to consume 
from
+| eagerCheckCompletion | consumer | false | boolean | Use eager completion 
checking which means that the completionPredicate will use the incoming 
Exchange. At opposed to without eager completion checking the 
completionPredicate will use the aggregated Exchange.
 | includeAllJMSXProperties | consumer | false | boolean | Whether to include 
all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to 
true will include properties such as JMSXAppID and JMSXUserID etc. Note: If you 
are using a custom headerFilterStrategy then this option does not apply.
 | mapJmsMessage | consumer | true | boolean | Specifies whether Camel should 
auto map the received JMS message to a suited payload type such as 
javax.jms.TextMessage to a String etc. See section about how mapping works 
below for more details.
 | pollDuration | consumer | 1000 | int | The duration in milliseconds of each 
poll for messages. completionTimeOut will be used if it is shorter and a batch 
has started.
@@ -166,6 +169,7 @@ The Simple JMS Batch component supports 19 endpoint options 
which are listed bel
 
 
 
+
 The `completionSize` endpoint attribute is used in conjunction with
 `completionTimeout`, where the first condition to be met will cause the
 aggregated `Exchange` to be emitted down the route.

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 1fe5617..243c2b8 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -36,6 +36,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -63,6 +64,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     private final int completionSize;
     private final int completionInterval;
     private final int completionTimeout;
+    private final Predicate completionPredicate;
+    private final boolean eagerCheckCompletion;
     private final int consumerCount;
     private final int pollDuration;
     private final ConnectionFactory connectionFactory;
@@ -90,6 +93,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         if (sjmsBatchEndpoint.isSendEmptyMessageWhenIdle() && 
completionTimeout <= 0 && completionInterval <= 0) {
             throw new IllegalArgumentException("SendEmptyMessageWhenIdle can 
only be enabled if either completionInterval or completionTimeout is also set");
         }
+        completionPredicate = sjmsBatchEndpoint.getCompletionPredicate();
+        eagerCheckCompletion = sjmsBatchEndpoint.isEagerCheckCompletion();
 
         pollDuration = sjmsBatchEndpoint.getPollDuration();
         if (pollDuration < 0) {
@@ -328,6 +333,26 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             final Exchange exchange = 
getEndpoint().createExchange(message, session);
                             aggregatedExchange = 
aggregationStrategy.aggregate(aggregatedExchange, exchange);
                             
aggregatedExchange.setProperty(Exchange.BATCH_SIZE, messageCount);
+
+                            // is the batch complete by predicate?
+                            if (completionPredicate != null) {
+                                try {
+                                    boolean complete;
+                                    if (eagerCheckCompletion) {
+                                        complete = 
completionPredicate.matches(exchange);
+                                    } else {
+                                        complete = 
completionPredicate.matches(aggregatedExchange);
+                                    }
+                                    if (complete) {
+                                        // trigger completion predicate
+                                        LOG.trace("Completion batch due 
predicate");
+                                        completionBatch(session);
+                                        reset();
+                                    }
+                                } catch (Exception e) {
+                                    LOG.warn("Error during evaluation of 
completion predicate " + e.getMessage() + ". This exception is ignored.", e);
+                                }
+                            }
                         }
 
                         if (usingTimeout && startTime > 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 9286000..4e06b87 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -23,6 +23,7 @@ import javax.jms.Session;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.sjms.SjmsHeaderFilterStrategy;
@@ -33,6 +34,7 @@ import org.apache.camel.component.sjms.jms.JmsBinding;
 import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -65,6 +67,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint 
implements HeaderFilterSt
     private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
     @UriParam(defaultValue = "1000")
     private int completionInterval;
+    @UriParam(javaType = "java.lang.String")
+    private Predicate completionPredicate;
+    @UriParam
+    private boolean eagerCheckCompletion;
     @UriParam
     private boolean sendEmptyMessageWhenIdle;
     @UriParam(defaultValue = "1000")
@@ -212,6 +218,38 @@ public class SjmsBatchEndpoint extends DefaultEndpoint 
implements HeaderFilterSt
         this.completionInterval = completionInterval;
     }
 
+    public Predicate getCompletionPredicate() {
+        return completionPredicate;
+    }
+
+    /**
+     * The completion predicate, which causes batches to be completed when the 
predicate evaluates as true.
+     * <p/>
+     * The predicate can also be configured using the simple language using 
the string syntax.
+     * You may want to set the option eagerCheckCompletion to true to let the 
predicate match the incoming message,
+     * as otherwise it matches the aggregated message.
+     */
+    public void setCompletionPredicate(Predicate completionPredicate) {
+        this.completionPredicate = completionPredicate;
+    }
+
+    public void setCompletionPredicate(String predicate) {
+        // uses simple language
+        this.completionPredicate = SimpleLanguage.predicate(predicate);
+    }
+
+    public boolean isEagerCheckCompletion() {
+        return eagerCheckCompletion;
+    }
+
+    /**
+     * Use eager completion checking which means that the completionPredicate 
will use the incoming Exchange.
+     * As opposed to without eager completion checking the completionPredicate 
will use the aggregated Exchange.
+     */
+    public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
+        this.eagerCheckCompletion = eagerCheckCompletion;
+    }
+
     public boolean isSendEmptyMessageWhenIdle() {
         return sendEmptyMessageWhenIdle;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3b236db/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index e9e5cc8..e378457 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -152,6 +152,33 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
     }
 
     @Test
+    public void testConsumptionCompletionPredicate() throws Exception {
+        final String completionPredicate = "${body} contains 'done'";
+        final int completionTimeout = -1; // predicate-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                
fromF("sjms-batch:%s?completionTimeout=%s&completionPredicate=%s&aggregationStrategy=#testStrategy&eagerCheckCompletion=true",
+                        queueName, completionTimeout, 
completionPredicate).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(2);
+
+        template.sendBody("direct:in", generateStrings(50));
+        template.sendBody("direct:in", "Message done");
+        template.sendBody("direct:in", generateStrings(50));
+        template.sendBody("direct:in", "Message done");
+        mockBatches.assertIsSatisfied();
+    }
+
+    @Test
     public void testConsumptionCompletionTimeout() throws Exception {
         final int completionTimeout = 2000;
         final int completionSize = -1; // timeout-based only

Reply via email to