Thanks Brad.
Just now taking a look at the resequencer. In my case the final message and 
ordering 
aren’t important, only the timing of the split/group/completion. 

The resequncer may work, likewise I suppose I could get away with using delay() 
and 
some sufficiently large value after the split, there just seems to be a better 
way that 
I’m missing.

On 11/4/16, 8:54 AM, "Brad Johnson" <brad.john...@mediadriver.com> wrote:

    @Craig,
    
    This may or may not be appropriate to your situation depending on your
    situation, but I recently ran into a situation where I was parallel
    processing and making external REST calls to another company so one
    couldn't count on uniform transaction speed and an exception absolutely
    bogged a thread down. One example of the speed differential was the footer
    that came in the input file had to be written out last and it required no
    processing so it zipped through.
    
    What I ended up doing is right after my splitter I added a header with an
    incremented value so I'd know what order they came in. Then I'd drop it
    into a SEDA queue with multiple threads for consumers. Right before I would
    write it to the output file, I put in a resequencer with a time out.  That
    time out value was of sufficient length to ensure that values on the
    resequence queue would get shuffled to correct order  before getting output.
    
    The only item I wished were different about the resequencer is a reset on
    the timeout of an item if another item came in with a lower number.  If the
    footer was number 20 and 4 seconds later number 18 came in, I'd prefer
    number 20's timeout to be reset to help ensure that differences in order be
    naturally taken care of.
    
    Anyway, your aggregation strategy could assign an order number and before
    writing it out you drop it on the resequencer queue so if another one with
    a lower number comes in after it they get output in their natural order.
    Someday if I get some time I'd like to modify the resequencer to have that
    behavior, if not by default, then at least settable via a flag.  I can't
    really think of a case where I wouldn't want the timer reset when a
    resequence on order of messages occurred.  After all, the purpose is to
    ensure the ordering is correct independent of the timing.
    
    Like I said, that may or may not help in your situation. I don't know if
    I've used the composed message EIP that Claus mentions so can't really
    comment.
    
    On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:
    
    > See the composed message processor EIP with the splitter only
    > http://camel.apache.org/composed-message-processor.html
    >
    >
    > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
    > <craig.washing...@aexp.com.invalid> wrote:
    > > Hello,
    > > I have a simple use case where I'd like to do the following:
    > > * split a message and process each part
    > > * aggregate parts into groups of size N for group-processing (w/timeout
    > to ensure no parts are lost)
    > > * continue route ONLY after all aggregated parts have completed
    > >
    > > The simplified route and output are as follows:
    > > ---
    > >
    > > public class CamelSplitAggregateWaitForCompletion extends
    > CamelTestSupport {
    > >     @Test
    > >     public void test() throws Exception {
    > >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
    > >         Thread.sleep(3000);
    > >     }
    > >
    > >     @Override
    > >     protected RoutesBuilder createRouteBuilder() throws Exception {
    > >         return new RouteBuilder() {
    > >             @Override
    > >             public void configure() throws Exception {
    > >                 from("direct:start")
    > >                     .split(body().tokenize(","), new
    > UseLatestAggregationStrategy())
    > >                     .streaming()
    > >                         //process each msg part
    > >                         .log("processing the msg: ${body}")
    > >                         //group by size
    > >                         .aggregate(constant(true), new
    > GroupedMessageAggregationStrategy())
    > >                         .completionSize(2).completionTimeout(2000)
    > >                             //save grouped messages in batches
    > >                             .log("saving msg group ${body}")
    > >                             .to("mock:result")
    > >                         //end the aggregate processing
    > >                         .end()
    > >                     //end the split processing
    > >                     .end()
    > >                     .log("*** COMPLETED split+aggregate processing");
    > >
    > >                     //...do some more stuff here ONLY after all parts
    > are complete...
    > >             }
    > >         };
    > >     }
    > > }
    > >
    > > ---
    > > //output
    > > 15:28:52.072 INFO  route1 - processing the msg: AAA
    > > 15:28:52.074 INFO  route1 - processing the msg: BBB
    > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > > 15:28:52.076 INFO  route1 - processing the msg: CCC
    > > 15:28:52.077 INFO  route1 - processing the msg: DDD
    > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > > 15:28:52.078 INFO  route1 - processing the msg: EEE
    > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
    > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
    > > ---
    > >
    > > Ideally, the "COMPLETED" line should print last (after the final
    > aggregated group from timeout).
    > > Seems simple enough though I haven't found a way to get this working.
    > Neither of the completionTimeout examples I've found in source nor CIA
    > focus on the route timing after the split.
    > >
    > > (I'm actually trying to process a large file with streaming and
    > parallelProcessing so completionFromBatchConsumer() wouldn't work, though 
I
    > think this part is irrelevant)
    > >
    > > Using Camel 2.17.3
    > >
    > > Thanks
    > >
    > >
    > >
    > > American Express made the following annotations
    > > ************************************************************
    > ******************
    > > "This message and any attachments are solely for the intended recipient
    > and may contain confidential or privileged information. If you are not the
    > intended recipient, any disclosure, copying, use, or distribution of the
    > information included in this message and any attachments is prohibited. If
    > you have received this communication in error, please notify us by reply
    > e-mail and immediately and permanently delete this message and any
    > attachments. Thank you."
    > >
    > > American Express a ajouté le commentaire suivant le Ce courrier et toute
    > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
    > peuvent renfermer des
    > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
    > destinataire prévu, toute divulgation, duplication, utilisation ou
    > distribution du courrier ou de toute pièce jointe est interdite. Si vous
    > avez reçu cette communication par erreur, veuillez nous en aviser par
    > courrier et détruire immédiatement le courrier et les pièces jointes. 
Merci.
    > >
    > > ************************************************************
    > ******************
    >
    >
    >
    > --
    > Claus Ibsen
    > -----------------
    > http://davsclaus.com @davsclaus
    > Camel in Action 2: https://www.manning.com/ibsen2
    >
    



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may 
contain confidential or privileged information. If you are not the intended 
recipient, any disclosure, copying, use, or distribution of the information 
included in this message and any attachments is prohibited. If you have 
received this communication in error, please notify us by reply e-mail and 
immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce 
jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent 
renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire 
prévu, toute divulgation, duplication, utilisation ou distribution du courrier 
ou de toute pièce jointe est interdite. Si vous avez reçu cette communication 
par erreur, veuillez nous en aviser par courrier et détruire immédiatement le 
courrier et les pièces jointes. Merci.

******************************************************************************

Reply via email to