See the composed message processor EIP with the splitter only
http://camel.apache.org/composed-message-processor.html


On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
<craig.washing...@aexp.com.invalid> wrote:
> Hello,
> I have a simple use case where I'd like to do the following:
> * split a message and process each part
> * aggregate parts into groups of size N for group-processing (w/timeout to 
> ensure no parts are lost)
> * continue route ONLY after all aggregated parts have completed
>
> The simplified route and output are as follows:
> ---
>
> public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport {
>     @Test
>     public void test() throws Exception {
>         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
>         Thread.sleep(3000);
>     }
>
>     @Override
>     protected RoutesBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("direct:start")
>                     .split(body().tokenize(","), new 
> UseLatestAggregationStrategy())
>                     .streaming()
>                         //process each msg part
>                         .log("processing the msg: ${body}")
>                         //group by size
>                         .aggregate(constant(true), new 
> GroupedMessageAggregationStrategy())
>                         .completionSize(2).completionTimeout(2000)
>                             //save grouped messages in batches
>                             .log("saving msg group ${body}")
>                             .to("mock:result")
>                         //end the aggregate processing
>                         .end()
>                     //end the split processing
>                     .end()
>                     .log("*** COMPLETED split+aggregate processing");
>
>                     //...do some more stuff here ONLY after all parts are 
> complete...
>             }
>         };
>     }
> }
>
> ---
> //output
> 15:28:52.072 INFO  route1 - processing the msg: AAA
> 15:28:52.074 INFO  route1 - processing the msg: BBB
> 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
> 15:28:52.076 INFO  route1 - processing the msg: CCC
> 15:28:52.077 INFO  route1 - processing the msg: DDD
> 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
> 15:28:52.078 INFO  route1 - processing the msg: EEE
> 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
> 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
> ---
>
> Ideally, the "COMPLETED" line should print last (after the final aggregated 
> group from timeout).
> Seems simple enough though I haven't found a way to get this working. Neither 
> of the completionTimeout examples I've found in source nor CIA focus on the 
> route timing after the split.
>
> (I'm actually trying to process a large file with streaming and 
> parallelProcessing so completionFromBatchConsumer() wouldn't work, though I 
> think this part is irrelevant)
>
> Using Camel 2.17.3
>
> Thanks
>
>
>
> American Express made the following annotations
> ******************************************************************************
> "This message and any attachments are solely for the intended recipient and 
> may contain confidential or privileged information. If you are not the 
> intended recipient, any disclosure, copying, use, or distribution of the 
> information included in this message and any attachments is prohibited. If 
> you have received this communication in error, please notify us by reply 
> e-mail and immediately and permanently delete this message and any 
> attachments. Thank you."
>
> American Express a ajouté le commentaire suivant le Ce courrier et toute 
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et 
> peuvent renfermer des
> renseignements confidentiels et privilégiés. Si vous n'êtes pas le 
> destinataire prévu, toute divulgation, duplication, utilisation ou 
> distribution du courrier ou de toute pièce jointe est interdite. Si vous avez 
> reçu cette communication par erreur, veuillez nous en aviser par courrier et 
> détruire immédiatement le courrier et les pièces jointes. Merci.
>
> ******************************************************************************



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to