Hello, I have a simple use case where I'd like to do the following: * split a message and process each part * aggregate parts into groups of size N for group-processing (w/timeout to ensure no parts are lost) * continue route ONLY after all aggregated parts have completed
The simplified route and output are as follows: --- public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport { @Test public void test() throws Exception { template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE"); Thread.sleep(3000); } @Override protected RoutesBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .split(body().tokenize(","), new UseLatestAggregationStrategy()) .streaming() //process each msg part .log("processing the msg: ${body}") //group by size .aggregate(constant(true), new GroupedMessageAggregationStrategy()) .completionSize(2).completionTimeout(2000) //save grouped messages in batches .log("saving msg group ${body}") .to("mock:result") //end the aggregate processing .end() //end the split processing .end() .log("*** COMPLETED split+aggregate processing"); //...do some more stuff here ONLY after all parts are complete... } }; } } --- //output 15:28:52.072 INFO route1 - processing the msg: AAA 15:28:52.074 INFO route1 - processing the msg: BBB 15:28:52.075 INFO route1 - saving msg group List<Exchange>(2 elements) 15:28:52.076 INFO route1 - processing the msg: CCC 15:28:52.077 INFO route1 - processing the msg: DDD 15:28:52.077 INFO route1 - saving msg group List<Exchange>(2 elements) 15:28:52.078 INFO route1 - processing the msg: EEE 15:28:52.079 INFO route1 - *** COMPLETED split+aggregate processing 15:28:55.064 INFO route1 - saving msg group List<Exchange>(1 elements) --- Ideally, the "COMPLETED" line should print last (after the final aggregated group from timeout). Seems simple enough though I haven't found a way to get this working. Neither of the completionTimeout examples I've found in source nor CIA focus on the route timing after the split. (I'm actually trying to process a large file with streaming and parallelProcessing so completionFromBatchConsumer() wouldn't work, though I think this part is irrelevant) Using Camel 2.17.3 Thanks American Express made the following annotations ****************************************************************************** "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you." American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci. ******************************************************************************