Author: wtam
Date: Mon Dec 15 19:06:46 2008
New Revision: 726932

URL: http://svn.apache.org/viewvc?rev=726932&view=rev
Log:
[CAMEL-1159] Check the logic in Aggregator.isBatchCompleted()

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
    
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=726932&r1=726931&r2=726932&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
 Mon Dec 15 19:06:46 2008
@@ -64,15 +64,4 @@
         return "Aggregator[to: " + getProcessor() + "]";
     }
 
-    @Override
-    protected boolean isBatchCompleted(int index) {
-        if (aggregationCompletedPredicate != null) {
-            // TODO: (davsclaus) CAMEL-1159 What is the point with this code? 
I think its wrong
-            if (getCollection().size() > 0) {
-                return true;
-            }
-        }
-
-        return super.isBatchCompleted(index);
-    }
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=726932&r1=726931&r2=726932&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 Mon Dec 15 19:06:46 2008
@@ -114,15 +114,23 @@
     }
 
     /**
-     * A strategy method to decide if the batch is completed the resulting 
exchanges should be sent
+     * A strategy method to decide if the "in" batch is completed.  That is, 
whether the resulting 
+     * exchanges in the in queue should be drained to the "out" collection.
      */
-    protected boolean isBatchCompleted(int num) {
-        // out batch size is optional and we should only check it if its 
enabled (= >0)
-        if (outBatchSize > 0 && collection.size() >= outBatchSize) {
+    protected boolean isInBatchCompleted(int num) {
+        return num >= batchSize;
+    }
+    
+    /**
+     * A strategy method to decide if the "out" batch is completed. That is, 
whether the resulting 
+     * exchange in the out collection should be sent.
+     */
+    protected boolean isOutBatchCompleted() {
+        if (outBatchSize == 0) {
+            // out batch is disabled, so go ahead and send.
             return true;
         }
-        // fallback to regular batch size check
-        return num >= batchSize;
+        return collection.size() > 0 && collection.size() >= outBatchSize;
     }
 
     /**
@@ -175,10 +183,19 @@
             while (true) {
                 try {
                     Thread.sleep(batchTimeout);
+                    queue.drainTo(collection, batchSize);  
                 } catch (InterruptedException e) {
                     if (cancelRequested) {
                         return;
                     }
+                    
+                    while (isInBatchCompleted(queue.size())) {
+                        queue.drainTo(collection, batchSize);  
+                    }
+                    
+                    if (!isOutBatchCompleted()) {
+                        continue;
+                    }
                 }
                 try {
                     sendExchanges();
@@ -192,20 +209,13 @@
             cancelRequested = true;
             interrupt();
         }
-        
-        public void sendBatch() {
-            interrupt();
-        }
      
         public void enqueueExchange(Exchange exchange) {
             queue.add(exchange);
-            if (isBatchCompleted(queue.size())) {
-                sendBatch();
-            }
+            interrupt();
         }
         
         private void sendExchanges() throws Exception {
-            queue.drainTo(collection, batchSize);
             Iterator<Exchange> iter = collection.iterator();
             while (iter.hasNext()) {
                 Exchange exchange = iter.next();
@@ -213,7 +223,6 @@
                 processExchange(exchange);
             }
         }
-        
     }
     
 }

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=726932&r1=726931&r2=726932&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
 Mon Dec 15 19:06:46 2008
@@ -41,18 +41,17 @@
     }
 
     public void testPredicate() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
-
-        resultEndpoint.expectedMessageCount(messageCount / 5);
-        // lets send a large batch of messages
-        for (int i = 1; i <= messageCount; i++) {
-            String body = "message:" + i;
-            template.sendBodyAndHeader("direct:predicate", body, "cheese", 
123);
-        }
-
-        resultEndpoint.assertIsSatisfied();
+        testSendALargeBatch("direct:predicate");
+    }
+    
+    public void testOutBatchPredicate() throws Exception {
+        testSendALargeBatch("direct:outBatchPredicate");
     }
 
+    public void testOutBatchWithNoInBatching() throws Exception {
+        testSendALargeBatch("direct:outBatchNoInBatching");
+    }
+    
     public void testOneMessage() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
 
@@ -87,15 +86,38 @@
                 // in this route we aggregate all from direct:state based on 
the header id cheese
                 
from("direct:start").aggregate(header("cheese")).to("mock:result");
 
-                // CAMEL-393 now fixed
                 from("seda:header").setHeader("visited", 
constant(true)).aggregate(header("cheese")).to("mock:result");
 
                 // in this sample we aggregate using our own startegy with a 
completion predicate
                 // stating that the aggregated header is equal to 5.
                 from("direct:predicate").aggregate(header("cheese"), new 
MyAggregationStrategy()).
                     
completedPredicate(header("aggregated").isEqualTo(5)).to("mock:result");
+                
+                // this sample is similar to the one above but it also 
illustrates the use of outBatchSize 
+                // to send exchanges to mock:endpoint in batches of 10.  
+                from("direct:outBatchPredicate").aggregate(header("cheese"), 
new MyAggregationStrategy()).
+                    
completedPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");
                 // END SNIPPET: ex
+
+                // turning off in batching (batchSize = 1) is a good way to 
test "out" batching.  Don't include
+                // in wiki snippet as it may not be a good example to follow.
+                
from("direct:outBatchNoInBatching").aggregate(header("cheese"), new 
MyAggregationStrategy()).
+                
completedPredicate(header("aggregated").isEqualTo(5)).batchSize(1).outBatchSize(10).to("mock:result");
             }
         };
     }
+    
+    private void testSendALargeBatch(String endpointUri) throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
+
+        resultEndpoint.expectedMessageCount(messageCount / 5);
+        // lets send a large batch of messages
+        for (int i = 1; i <= messageCount; i++) {
+            String body = "message:" + i;
+            template.sendBodyAndHeader(endpointUri, body, "cheese", 123);
+        }
+
+        resultEndpoint.assertIsSatisfied();
+        
+    }
 }

Modified: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml?rev=726932&r1=726931&r2=726932&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
 (original)
+++ 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
 Mon Dec 15 19:06:46 2008
@@ -62,8 +62,38 @@
         </completedPredicate>
       </aggregate>
     </route>
+    
+    <route>
+      <from uri="direct:outBatchPredicate"/>
+      <aggregate strategyRef="myAggregatorStrategy" outBatchSize="10">
+        <correlationExpression>
+          <simple>header.cheese</simple>
+        </correlationExpression>
+        <to uri="mock:result"/>
+        <completedPredicate>
+          <method bean="myAggregatorStrategy" method="isCompleted"/>
+        </completedPredicate>
+      </aggregate>
+    </route>
+    
+    <!--  This route turns off in batching by setting batchSize to 1 to run 
unit test for out batching.
+          Normal use cases may not want to disable in batching
+    -->
+    <route>
+      <from uri="direct:outBatchNoInBatching"/>
+      <aggregate strategyRef="myAggregatorStrategy" batchSize="1" 
outBatchSize="10">
+        <correlationExpression>
+          <simple>header.cheese</simple>
+        </correlationExpression>
+        <to uri="mock:result"/>
+        <completedPredicate>
+          <method bean="myAggregatorStrategy" method="isCompleted"/>
+        </completedPredicate>
+      </aggregate>
+    </route>
   </camelContext>
   <!-- END SNIPPET: example -->
+  
 
   <bean id="myAggregatorStrategy" 
class="org.apache.camel.processor.MyAggregationStrategy"/>
   <bean id="setHeaderProcessor" 
class="org.apache.camel.spring.processor.SetHeaderProcessor">


Reply via email to