Hi Artur, I was thinking that the order of the messages would be important as you need to process them sequentially.
So I think you could use the dynamic message routing[1] with aggregator[2], something like: from("aws-sqs:...") .process("#preProcess") .toD("direct:${header.nextRoute}"); from("direct:parallel")...; from("direct:sequential").aggregate(simple("${header.group}")).completion..; So from yout SQS queue you would use a processor to pre-process message whose responsibility would be to set the (custom) `nextRoute` and (custom) `group` headers. `nextRoute` would be `parallel` or `sequential`, and if `sequential` the messages would be aggregated using the `group` header. You would want to define your own custom aggregation strategy or use the completion* options that are available to you. There also might be need to use seda[3] to fine tune any parallel processing. You might throw in there a data format unmarshaller[4] instead of the `preProcess` processor and use something like `${body.xyz} == foo` in the `toD` expression. And I would guess that you need to examine transactions or persistence at some point also in case your aggregation step runs for a long time or if your use case is sensitive to message loss if interrupted -- which would undoubtedly lead you back to using queues to separate those two ways of processing, HTH, zoran [1] https://camel.apache.org/message-endpoint.html [2] https://camel.apache.org/aggregator2.html [3] https://camel.apache.org/seda.html [4] https://camel.apache.org/data-format.html On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski <ajablon...@ravenpack.com> wrote: > Hey Zoran. > > I read again the patterns you mentioned. In my use case the order of > processing within a group doesn't matter as long as two messages from the > same group are never processed in parallel. So i guess resenquencer is out > of the picture unless I didn't get the intention. > > So what we are left with is the content based router. Sure. The message > comes, i can see what group it belongs two... And what next? Perhaps it's > the very first message from that group so I would need to trigger creating > route/processor for that group somehow, perhaps messages from this group > were processed before in which case the processor for the group should > already exist... > > > > > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote: > >> Hi Artur, >> have a look at Camel EIP page[1], what you describe sounds to me like >> Resequencer and Content based router patterns, >> >> zoran >> >> [1] https://camel.apache.org/eip.html >> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski >> <ajablon...@ravenpack.com> wrote: >> > Hello. >> > >> > I wonder if someone could push me in the right direction trying to >> express >> > quite curious case in Camel route. >> > >> > Imagine there's a stream of messages some of which can be processed in >> > parallel and some have to be processed serially. You can group the >> messages >> > that require serial processing together by looking at the message body. >> You >> > don't know upfront how many groups can occur in the stream. >> > >> > The way I thought about doing this is having a route for each message >> > group. Since I don't know upfront how many and what groups there will be >> > then I would need to create routes dynamically. If a message comes >> > belonging to a group that doesn't have it's handling route, then i could >> > create it (is that even possible??) Then if there's no messages coming >> for >> > a given group in some time I could remove the route for the group to >> > cleanup (is that possible?) >> > >> > New to Camel >> > >> > Thx! >> > Artur >> >> >> >> -- >> Zoran Regvart >> -- Zoran Regvart