Author: wtam
Date: Mon Dec 15 20:10:01 2008
New Revision: 726941
URL: http://svn.apache.org/viewvc?rev=726941&view=rev
Log:
Merged revisions 726932 via svnmerge from
https://svn.apache.org/repos/asf/activemq/camel/trunk
........
r726932 | wtam | 2008-12-15 22:06:46 -0500 (Mon, 15 Dec 2008) | 1 line
[CAMEL-1159] Check the logic in Aggregator.isBatchCompleted()
........
Modified:
activemq/camel/branches/camel-1.x/ (props changed)
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 15 20:10:01 2008
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,726640-726645
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,726640-726645,726932
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=726941&r1=726940&r2=726941&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Dec 15 20:10:01 2008
@@ -64,15 +64,4 @@
return "Aggregator[to: " + getProcessor() + "]";
}
- @Override
- protected boolean isBatchCompleted(int index) {
- if (aggregationCompletedPredicate != null) {
- // TODO: (davsclaus) CAMEL-1159 What is the point with this code?
I think its wrong
- if (getCollection().size() > 0) {
- return true;
- }
- }
-
- return super.isBatchCompleted(index);
- }
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=726941&r1=726940&r2=726941&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Dec 15 20:10:01 2008
@@ -114,15 +114,23 @@
}
/**
- * A strategy method to decide if the batch is completed the resulting
exchanges should be sent
+ * A strategy method to decide if the "in" batch is completed. That is,
whether the resulting
+ * exchanges in the in queue should be drained to the "out" collection.
*/
- protected boolean isBatchCompleted(int num) {
- // out batch size is optional and we should only check it if its
enabled (= >0)
- if (outBatchSize > 0 && collection.size() >= outBatchSize) {
+ protected boolean isInBatchCompleted(int num) {
+ return num >= batchSize;
+ }
+
+ /**
+ * A strategy method to decide if the "out" batch is completed. That is,
whether the resulting
+ * exchange in the out collection should be sent.
+ */
+ protected boolean isOutBatchCompleted() {
+ if (outBatchSize == 0) {
+ // out batch is disabled, so go ahead and send.
return true;
}
- // fallback to regular batch size check
- return num >= batchSize;
+ return collection.size() > 0 && collection.size() >= outBatchSize;
}
/**
@@ -175,10 +183,19 @@
while (true) {
try {
Thread.sleep(batchTimeout);
+ queue.drainTo(collection, batchSize);
} catch (InterruptedException e) {
if (cancelRequested) {
return;
}
+
+ while (isInBatchCompleted(queue.size())) {
+ queue.drainTo(collection, batchSize);
+ }
+
+ if (!isOutBatchCompleted()) {
+ continue;
+ }
}
try {
sendExchanges();
@@ -192,20 +209,13 @@
cancelRequested = true;
interrupt();
}
-
- public void sendBatch() {
- interrupt();
- }
public void enqueueExchange(Exchange exchange) {
queue.add(exchange);
- if (isBatchCompleted(queue.size())) {
- sendBatch();
- }
+ interrupt();
}
private void sendExchanges() throws Exception {
- queue.drainTo(collection, batchSize);
Iterator<Exchange> iter = collection.iterator();
while (iter.hasNext()) {
Exchange exchange = iter.next();
@@ -213,7 +223,6 @@
processExchange(exchange);
}
}
-
}
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=726941&r1=726940&r2=726941&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Dec 15 20:10:01 2008
@@ -41,18 +41,17 @@
}
public void testPredicate() throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
-
- resultEndpoint.expectedMessageCount(messageCount / 5);
- // lets send a large batch of messages
- for (int i = 1; i <= messageCount; i++) {
- String body = "message:" + i;
- template.sendBodyAndHeader("direct:predicate", body, "cheese",
123);
- }
-
- resultEndpoint.assertIsSatisfied();
+ testSendALargeBatch("direct:predicate");
+ }
+
+ public void testOutBatchPredicate() throws Exception {
+ testSendALargeBatch("direct:outBatchPredicate");
}
+ public void testOutBatchWithNoInBatching() throws Exception {
+ testSendALargeBatch("direct:outBatchNoInBatching");
+ }
+
public void testOneMessage() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
@@ -87,15 +86,38 @@
// in this route we aggregate all from direct:state based on
the header id cheese
from("direct:start").aggregator(header("cheese")).to("mock:result");
- // CAMEL-393 now fixed
from("seda:header").setHeader("visited",
constant(true)).aggregator(header("cheese")).to("mock:result");
// in this sample we aggreagte using our own startegy with a
completion predicate
// stating that the aggregated header is equal to 5.
from("direct:predicate").aggregator(header("cheese"), new
MyAggregationStrategy()).
completedPredicate(header("aggregated").isEqualTo(5)).to("mock:result");
+
+ // this sample is similar to the one above but it also
illustrates the use of outBatchSize
+ // to send exchanges to mock:endpoint in batches of 10.
+ from("direct:outBatchPredicate").aggregator(header("cheese"),
new MyAggregationStrategy()).
+
completedPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");
// END SNIPPET: ex
+
+ // turning off in batching (batchSize = 1) is a good way to
test "out" batching. Don't include
+ // in wiki snippet as it may not be a good example to follow.
+
from("direct:outBatchNoInBatching").aggregator(header("cheese"), new
MyAggregationStrategy()).
+
completedPredicate(header("aggregated").isEqualTo(5)).batchSize(1).outBatchSize(10).to("mock:result");
}
};
}
+
+ private void testSendALargeBatch(String endpointUri) throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
+
+ resultEndpoint.expectedMessageCount(messageCount / 5);
+ // lets send a large batch of messages
+ for (int i = 1; i <= messageCount; i++) {
+ String body = "message:" + i;
+ template.sendBodyAndHeader(endpointUri, body, "cheese", 123);
+ }
+
+ resultEndpoint.assertIsSatisfied();
+
+ }
}
Modified:
activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml?rev=726941&r1=726940&r2=726941&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
(original)
+++
activemq/camel/branches/camel-1.x/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
Mon Dec 15 20:10:01 2008
@@ -56,8 +56,34 @@
</completedPredicate>
</aggregator>
</route>
+
+ <route>
+ <from uri="direct:outBatchPredicate"/>
+ <aggregator strategyRef="myAggregatorStrategy" outBatchSize="10">
+ <simple>header.cheese</simple>
+ <to uri="mock:result"/>
+ <completedPredicate>
+ <methodCall bean="myAggregatorStrategy" method="isCompleted"/>
+ </completedPredicate>
+ </aggregator>
+ </route>
+
+ <!-- This route turns off in batching by setting batchSize to 1 to run
unit test for out batching.
+ Normal use cases may not want to disable in batching
+ -->
+ <route>
+ <from uri="direct:outBatchNoInBatching"/>
+ <aggregator strategyRef="myAggregatorStrategy" batchSize="1"
outBatchSize="10">
+ <simple>header.cheese</simple>
+ <to uri="mock:result"/>
+ <completedPredicate>
+ <methodCall bean="myAggregatorStrategy" method="isCompleted"/>
+ </completedPredicate>
+ </aggregator>
+ </route>
</camelContext>
<!-- END SNIPPET: example -->
+
<bean id="myAggregatorStrategy"
class="org.apache.camel.processor.MyAggregationStrategy"/>
<bean id="setHeaderProcessor"
class="org.apache.camel.spring.processor.SetHeaderProcessor">