See the composed message processor EIP with the splitter only http://camel.apache.org/composed-message-processor.html
On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington <craig.washing...@aexp.com.invalid> wrote: > Hello, > I have a simple use case where I'd like to do the following: > * split a message and process each part > * aggregate parts into groups of size N for group-processing (w/timeout to > ensure no parts are lost) > * continue route ONLY after all aggregated parts have completed > > The simplified route and output are as follows: > --- > > public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport { > @Test > public void test() throws Exception { > template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE"); > Thread.sleep(3000); > } > > @Override > protected RoutesBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > from("direct:start") > .split(body().tokenize(","), new > UseLatestAggregationStrategy()) > .streaming() > //process each msg part > .log("processing the msg: ${body}") > //group by size > .aggregate(constant(true), new > GroupedMessageAggregationStrategy()) > .completionSize(2).completionTimeout(2000) > //save grouped messages in batches > .log("saving msg group ${body}") > .to("mock:result") > //end the aggregate processing > .end() > //end the split processing > .end() > .log("*** COMPLETED split+aggregate processing"); > > //...do some more stuff here ONLY after all parts are > complete... > } > }; > } > } > > --- > //output > 15:28:52.072 INFO route1 - processing the msg: AAA > 15:28:52.074 INFO route1 - processing the msg: BBB > 15:28:52.075 INFO route1 - saving msg group List<Exchange>(2 elements) > 15:28:52.076 INFO route1 - processing the msg: CCC > 15:28:52.077 INFO route1 - processing the msg: DDD > 15:28:52.077 INFO route1 - saving msg group List<Exchange>(2 elements) > 15:28:52.078 INFO route1 - processing the msg: EEE > 15:28:52.079 INFO route1 - *** COMPLETED split+aggregate processing > 15:28:55.064 INFO route1 - saving msg group List<Exchange>(1 elements) > --- > > Ideally, the "COMPLETED" line should print last (after the final aggregated > group from timeout). > Seems simple enough though I haven't found a way to get this working. Neither > of the completionTimeout examples I've found in source nor CIA focus on the > route timing after the split. > > (I'm actually trying to process a large file with streaming and > parallelProcessing so completionFromBatchConsumer() wouldn't work, though I > think this part is irrelevant) > > Using Camel 2.17.3 > > Thanks > > > > American Express made the following annotations > ****************************************************************************** > "This message and any attachments are solely for the intended recipient and > may contain confidential or privileged information. If you are not the > intended recipient, any disclosure, copying, use, or distribution of the > information included in this message and any attachments is prohibited. If > you have received this communication in error, please notify us by reply > e-mail and immediately and permanently delete this message and any > attachments. Thank you." > > American Express a ajouté le commentaire suivant le Ce courrier et toute > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et > peuvent renfermer des > renseignements confidentiels et privilégiés. Si vous n'êtes pas le > destinataire prévu, toute divulgation, duplication, utilisation ou > distribution du courrier ou de toute pièce jointe est interdite. Si vous avez > reçu cette communication par erreur, veuillez nous en aviser par courrier et > détruire immédiatement le courrier et les pièces jointes. Merci. > > ****************************************************************************** -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2