Hello,
I have a simple use case where I'd like to do the following:
* split a message and process each part
* aggregate parts into groups of size N for group-processing (w/timeout to 
ensure no parts are lost)
* continue route ONLY after all aggregated parts have completed

The simplified route and output are as follows:
---

public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport {
    @Test
    public void test() throws Exception {
        template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
        Thread.sleep(3000);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                    .split(body().tokenize(","), new 
UseLatestAggregationStrategy())
                    .streaming()
                        //process each msg part
                        .log("processing the msg: ${body}")
                        //group by size
                        .aggregate(constant(true), new 
GroupedMessageAggregationStrategy())
                        .completionSize(2).completionTimeout(2000)
                            //save grouped messages in batches
                            .log("saving msg group ${body}")
                            .to("mock:result")
                        //end the aggregate processing
                        .end()
                    //end the split processing
                    .end()
                    .log("*** COMPLETED split+aggregate processing");

                    //...do some more stuff here ONLY after all parts are 
complete...
            }
        };
    }
}

---
//output
15:28:52.072 INFO  route1 - processing the msg: AAA
15:28:52.074 INFO  route1 - processing the msg: BBB
15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
15:28:52.076 INFO  route1 - processing the msg: CCC
15:28:52.077 INFO  route1 - processing the msg: DDD
15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
15:28:52.078 INFO  route1 - processing the msg: EEE
15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
---

Ideally, the "COMPLETED" line should print last (after the final aggregated 
group from timeout).
Seems simple enough though I haven't found a way to get this working. Neither 
of the completionTimeout examples I've found in source nor CIA focus on the 
route timing after the split.

(I'm actually trying to process a large file with streaming and 
parallelProcessing so completionFromBatchConsumer() wouldn't work, though I 
think this part is irrelevant)

Using Camel 2.17.3

Thanks



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may 
contain confidential or privileged information. If you are not the intended 
recipient, any disclosure, copying, use, or distribution of the information 
included in this message and any attachments is prohibited. If you have 
received this communication in error, please notify us by reply e-mail and 
immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce 
jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent 
renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire 
prévu, toute divulgation, duplication, utilisation ou distribution du courrier 
ou de toute pièce jointe est interdite. Si vous avez reçu cette communication 
par erreur, veuillez nous en aviser par courrier et détruire immédiatement le 
courrier et les pièces jointes. Merci.

******************************************************************************

Reply via email to