Hi Artur, As your group set is unbounded, you may need to look at creating (in-memory) queues and routes at runtime and attaching them to a camel context (or spawning their own context). You can use some camel trickery with DefaultCamelContext + RouteDefinition, it's ugly but it should work.
Regards, Henrique Henrique Viecili On 12 April 2017 at 01:06, Claus Ibsen <claus.ib...@gmail.com> wrote: > If you use ActiveMQ you can use its message grouping support > http://activemq.apache.org/message-groups.html > > And Apache Kafka has topics where you can use partition ids to group > by A, B, C and then have parallel processing of each group. > > Not sure SQS has such advanced features. > > Its a bit brittle to have to write java code or split / aggregate eip > patterns in camel to do this, and if you want to have some level of > transaction guarantees. All EIP patterns in camel are transient and > dont persist their state etc. > > in other words try to use a better messaging system. > > > On Tue, Apr 11, 2017 at 3:31 PM, Artur Jablonski > <ajablon...@ravenpack.com> wrote: > > I guess one thing that come to my mind is to hide all this parallel stuff > > inside a processor, that would just spit out on the other end the result > of > > processing all those messages. It would handle grouping and serializing > and > > stuff I guess that would reduce the complexity of the route with a cost > of > > complexity of the processor. I have no better ideas anyway, so I will > give > > it a go > > > > On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski < > ajablon...@ravenpack.com> > > wrote: > > > >> Hello, > >> > >> I don't think this route definition is fitting my use case, though I > >> learnt a thing or two about the interesting patterns linked. Thanks! > >> > >> Ok, so let me try to clarify the use case. > >> > >> > >> 1. The stream is infinite, it's not a batch job. The messages keep on > >> coming from SQS 'all the time' > >> > >> 2. More important thing is about parallel processing. > >> > >> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc. > >> > >> Let's say this is the order in which the messages happen to appear > >> in the route from SQS > >> > >> A1, A2, B1, C1, B2, A3, C2, B3 > >> > >> Now what I am trying to achieve is grouping the messages that have to be > >> processed sequentially (order doesn't matter as long as no two messages > >> from the same group are processed at the same time). > >> So I am trying to somehow get these streams > >> > >> A1, A2, A3 > >> > >> B1, B2, B3 > >> > >> C1, C2 > >> > >> > >> So, A1 B1 and C1 can be processed in parallel because they are from > >> different groups, but the messages within groups need to be processed > one > >> by one. > >> > >> In my example, there are 3 groups, but there can be many and I don't > know > >> what they are in advance. The processing logic between the groups is > >> similar and is a function of the group so I can get a processor for > group A > >> from a method call getProcessor(A), B getProcessor(B), etc. > >> > >> I am stuck at how to do that in Camel, because since I don't know the > >> groups in advance, I would need to create processing routes dynamically. > >> > >> Say the system starts, and A1 arrives, there can't be any processor for > >> group A yet, since it's the first message from the group and I need to > >> somehow dynamically add processing capability of the group A to the > route > >> and then perhaps if the messages from group A stop arriving for some > time, > >> that processor could be removed. > >> > >> How to add the parallel part between the group messages is also blurry > to > >> me. One way of doing this I was thinking was to do a multicast to all > the > >> dynamically created processing routes for groups and stick a filter > before > >> so that only messages from particular group can go through. From > multicast > >> page: > >> > >> from("direct:a").multicast().parallelProcessing().to("direct:x", > >> "direct:y", "direct:z"); > >> > >> But here the x,y,z endpoints are hardcoded. I could write up some custom > >> multicast I suppose to search the routes in CamelContext...... not sure. > >> > >> Thanks > >> Artur > >> > >> > >> > >> > >> > >> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski < > ajablon...@ravenpack.com> > >> wrote: > >> > >>> Hi Zoran, > >>> > >>> Thank you for such detailed response. This looks very promising. i will > >>> need to get my head around the aggregator pattern. > >>> For this week I will be busy with other tasks, but I will get back to > it > >>> as soon as I can to see if I can get Camel work for the use case. > >>> > >>> Cheerio > >>> Artur > >>> > >>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> > wrote: > >>> > >>>> Hi Artur, > >>>> I was thinking that the order of the messages would be important as > >>>> you need to process them sequentially. > >>>> > >>>> So I think you could use the dynamic message routing[1] with > >>>> aggregator[2], something like: > >>>> > >>>> from("aws-sqs:...") > >>>> .process("#preProcess") > >>>> .toD("direct:${header.nextRoute}"); > >>>> > >>>> from("direct:parallel")...; > >>>> from("direct:sequential").aggregate(simple("${header.group}" > >>>> )).completion..; > >>>> > >>>> So from yout SQS queue you would use a processor to pre-process > >>>> message whose responsibility would be to set the (custom) `nextRoute` > >>>> and (custom) `group` headers. `nextRoute` would be `parallel` or > >>>> `sequential`, and if `sequential` the messages would be aggregated > >>>> using the `group` header. > >>>> > >>>> You would want to define your own custom aggregation strategy or use > >>>> the completion* options that are available to you. There also might be > >>>> need to use seda[3] to fine tune any parallel processing. You might > >>>> throw in there a data format unmarshaller[4] instead of the > >>>> `preProcess` processor and use something like `${body.xyz} == foo` in > >>>> the `toD` expression. > >>>> > >>>> And I would guess that you need to examine transactions or persistence > >>>> at some point also in case your aggregation step runs for a long time > >>>> or if your use case is sensitive to message loss if interrupted -- > >>>> which would undoubtedly lead you back to using queues to separate > >>>> those two ways of processing, > >>>> > >>>> HTH, > >>>> > >>>> zoran > >>>> > >>>> [1] https://camel.apache.org/message-endpoint.html > >>>> [2] https://camel.apache.org/aggregator2.html > >>>> [3] https://camel.apache.org/seda.html > >>>> [4] https://camel.apache.org/data-format.html > >>>> > >>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski > >>>> <ajablon...@ravenpack.com> wrote: > >>>> > Hey Zoran. > >>>> > > >>>> > I read again the patterns you mentioned. In my use case the order of > >>>> > processing within a group doesn't matter as long as two messages > from > >>>> the > >>>> > same group are never processed in parallel. So i guess resenquencer > is > >>>> out > >>>> > of the picture unless I didn't get the intention. > >>>> > > >>>> > So what we are left with is the content based router. Sure. The > message > >>>> > comes, i can see what group it belongs two... And what next? Perhaps > >>>> it's > >>>> > the very first message from that group so I would need to trigger > >>>> creating > >>>> > route/processor for that group somehow, perhaps messages from this > >>>> group > >>>> > were processed before in which case the processor for the group > should > >>>> > already exist... > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> > wrote: > >>>> > > >>>> >> Hi Artur, > >>>> >> have a look at Camel EIP page[1], what you describe sounds to me > like > >>>> >> Resequencer and Content based router patterns, > >>>> >> > >>>> >> zoran > >>>> >> > >>>> >> [1] https://camel.apache.org/eip.html > >>>> >> > >>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski > >>>> >> <ajablon...@ravenpack.com> wrote: > >>>> >> > Hello. > >>>> >> > > >>>> >> > I wonder if someone could push me in the right direction trying > to > >>>> >> express > >>>> >> > quite curious case in Camel route. > >>>> >> > > >>>> >> > Imagine there's a stream of messages some of which can be > processed > >>>> in > >>>> >> > parallel and some have to be processed serially. You can group > the > >>>> >> messages > >>>> >> > that require serial processing together by looking at the message > >>>> body. > >>>> >> You > >>>> >> > don't know upfront how many groups can occur in the stream. > >>>> >> > > >>>> >> > The way I thought about doing this is having a route for each > >>>> message > >>>> >> > group. Since I don't know upfront how many and what groups there > >>>> will be > >>>> >> > then I would need to create routes dynamically. If a message > comes > >>>> >> > belonging to a group that doesn't have it's handling route, then > i > >>>> could > >>>> >> > create it (is that even possible??) Then if there's no messages > >>>> coming > >>>> >> for > >>>> >> > a given group in some time I could remove the route for the > group to > >>>> >> > cleanup (is that possible?) > >>>> >> > > >>>> >> > New to Camel > >>>> >> > > >>>> >> > Thx! > >>>> >> > Artur > >>>> >> > >>>> >> > >>>> >> > >>>> >> -- > >>>> >> Zoran Regvart > >>>> >> > >>>> > >>>> > >>>> > >>>> -- > >>>> Zoran Regvart > >>>> > >>> > >>> > >> > > > > -- > Claus Ibsen > ----------------- > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 >