Thanks Brad. Just now taking a look at the resequencer. In my case the final message and ordering aren’t important, only the timing of the split/group/completion.
The resequncer may work, likewise I suppose I could get away with using delay() and some sufficiently large value after the split, there just seems to be a better way that I’m missing. On 11/4/16, 8:54 AM, "Brad Johnson" <brad.john...@mediadriver.com> wrote: @Craig, This may or may not be appropriate to your situation depending on your situation, but I recently ran into a situation where I was parallel processing and making external REST calls to another company so one couldn't count on uniform transaction speed and an exception absolutely bogged a thread down. One example of the speed differential was the footer that came in the input file had to be written out last and it required no processing so it zipped through. What I ended up doing is right after my splitter I added a header with an incremented value so I'd know what order they came in. Then I'd drop it into a SEDA queue with multiple threads for consumers. Right before I would write it to the output file, I put in a resequencer with a time out. That time out value was of sufficient length to ensure that values on the resequence queue would get shuffled to correct order before getting output. The only item I wished were different about the resequencer is a reset on the timeout of an item if another item came in with a lower number. If the footer was number 20 and 4 seconds later number 18 came in, I'd prefer number 20's timeout to be reset to help ensure that differences in order be naturally taken care of. Anyway, your aggregation strategy could assign an order number and before writing it out you drop it on the resequencer queue so if another one with a lower number comes in after it they get output in their natural order. Someday if I get some time I'd like to modify the resequencer to have that behavior, if not by default, then at least settable via a flag. I can't really think of a case where I wouldn't want the timer reset when a resequence on order of messages occurred. After all, the purpose is to ensure the ordering is correct independent of the timing. Like I said, that may or may not help in your situation. I don't know if I've used the composed message EIP that Claus mentions so can't really comment. On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <claus.ib...@gmail.com> wrote: > See the composed message processor EIP with the splitter only > http://camel.apache.org/composed-message-processor.html > > > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington > <craig.washing...@aexp.com.invalid> wrote: > > Hello, > > I have a simple use case where I'd like to do the following: > > * split a message and process each part > > * aggregate parts into groups of size N for group-processing (w/timeout > to ensure no parts are lost) > > * continue route ONLY after all aggregated parts have completed > > > > The simplified route and output are as follows: > > --- > > > > public class CamelSplitAggregateWaitForCompletion extends > CamelTestSupport { > > @Test > > public void test() throws Exception { > > template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE"); > > Thread.sleep(3000); > > } > > > > @Override > > protected RoutesBuilder createRouteBuilder() throws Exception { > > return new RouteBuilder() { > > @Override > > public void configure() throws Exception { > > from("direct:start") > > .split(body().tokenize(","), new > UseLatestAggregationStrategy()) > > .streaming() > > //process each msg part > > .log("processing the msg: ${body}") > > //group by size > > .aggregate(constant(true), new > GroupedMessageAggregationStrategy()) > > .completionSize(2).completionTimeout(2000) > > //save grouped messages in batches > > .log("saving msg group ${body}") > > .to("mock:result") > > //end the aggregate processing > > .end() > > //end the split processing > > .end() > > .log("*** COMPLETED split+aggregate processing"); > > > > //...do some more stuff here ONLY after all parts > are complete... > > } > > }; > > } > > } > > > > --- > > //output > > 15:28:52.072 INFO route1 - processing the msg: AAA > > 15:28:52.074 INFO route1 - processing the msg: BBB > > 15:28:52.075 INFO route1 - saving msg group List<Exchange>(2 elements) > > 15:28:52.076 INFO route1 - processing the msg: CCC > > 15:28:52.077 INFO route1 - processing the msg: DDD > > 15:28:52.077 INFO route1 - saving msg group List<Exchange>(2 elements) > > 15:28:52.078 INFO route1 - processing the msg: EEE > > 15:28:52.079 INFO route1 - *** COMPLETED split+aggregate processing > > 15:28:55.064 INFO route1 - saving msg group List<Exchange>(1 elements) > > --- > > > > Ideally, the "COMPLETED" line should print last (after the final > aggregated group from timeout). > > Seems simple enough though I haven't found a way to get this working. > Neither of the completionTimeout examples I've found in source nor CIA > focus on the route timing after the split. > > > > (I'm actually trying to process a large file with streaming and > parallelProcessing so completionFromBatchConsumer() wouldn't work, though I > think this part is irrelevant) > > > > Using Camel 2.17.3 > > > > Thanks > > > > > > > > American Express made the following annotations > > ************************************************************ > ****************** > > "This message and any attachments are solely for the intended recipient > and may contain confidential or privileged information. If you are not the > intended recipient, any disclosure, copying, use, or distribution of the > information included in this message and any attachments is prohibited. If > you have received this communication in error, please notify us by reply > e-mail and immediately and permanently delete this message and any > attachments. Thank you." > > > > American Express a ajouté le commentaire suivant le Ce courrier et toute > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et > peuvent renfermer des > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le > destinataire prévu, toute divulgation, duplication, utilisation ou > distribution du courrier ou de toute pièce jointe est interdite. Si vous > avez reçu cette communication par erreur, veuillez nous en aviser par > courrier et détruire immédiatement le courrier et les pièces jointes. Merci. > > > > ************************************************************ > ****************** > > > > -- > Claus Ibsen > ----------------- > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 > American Express made the following annotations ****************************************************************************** "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you." American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci. ******************************************************************************