Hi Artur,

As your group set is unbounded, you may need to look at creating
(in-memory) queues and routes at runtime and attaching them to a camel
context (or spawning their own context). You can use some camel trickery
with DefaultCamelContext + RouteDefinition, it's ugly but it should work.

Regards,
Henrique



Henrique Viecili

On 12 April 2017 at 01:06, Claus Ibsen <claus.ib...@gmail.com> wrote:

> If you use ActiveMQ you can use its message grouping support
> http://activemq.apache.org/message-groups.html
>
> And Apache Kafka has topics where you can use partition ids to group
> by A, B, C and then have parallel processing of each group.
>
> Not sure SQS has such advanced features.
>
> Its a bit brittle to have to write java code or split / aggregate eip
> patterns in camel to do this, and if you want to have some level of
> transaction guarantees. All EIP patterns in camel are transient and
> dont persist their state etc.
>
> in other words try to use a better messaging system.
>
>
> On Tue, Apr 11, 2017 at 3:31 PM, Artur Jablonski
> <ajablon...@ravenpack.com> wrote:
> > I guess one thing that come to my mind is to hide all this parallel stuff
> > inside a processor, that would just spit out on the other end the result
> of
> > processing all those messages. It would handle grouping and serializing
> and
> > stuff I guess that would reduce the complexity of the route with a cost
> of
> > complexity of the processor. I have no better ideas anyway, so I will
> give
> > it a go
> >
> > On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <
> ajablon...@ravenpack.com>
> > wrote:
> >
> >> Hello,
> >>
> >> I don't think this route definition is fitting my use case, though I
> >> learnt a thing or two about the interesting patterns linked. Thanks!
> >>
> >> Ok, so let me try to clarify the use case.
> >>
> >>
> >> 1. The stream is infinite, it's not a batch job. The messages keep on
> >> coming from SQS 'all the time'
> >>
> >> 2. More important thing is about parallel processing.
> >>
> >> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.
> >>
> >> Let's say this is the order in which the messages happen to appear
> >>  in the route from SQS
> >>
> >> A1, A2, B1, C1, B2, A3, C2, B3
> >>
> >> Now what I am trying to achieve is grouping the messages that have to be
> >> processed sequentially (order doesn't matter as long as no two messages
> >> from the same group are processed at the same time).
> >> So I am trying to somehow get these streams
> >>
> >> A1, A2, A3
> >>
> >> B1, B2, B3
> >>
> >> C1, C2
> >>
> >>
> >> So, A1 B1 and C1 can be processed in parallel because they are from
> >> different groups, but the messages within groups need to be processed
> one
> >> by one.
> >>
> >> In my example, there are 3 groups, but there can be many and I don't
> know
> >> what they are in advance. The processing logic between the groups is
> >> similar and is a function of the group so I can get a processor for
> group A
> >> from a method call getProcessor(A), B getProcessor(B), etc.
> >>
> >> I am stuck at how to do that in Camel, because since I don't know the
> >> groups in advance, I would need to create processing routes dynamically.
> >>
> >> Say the system starts, and A1 arrives, there can't be any processor for
> >> group A yet, since it's the first message from the group and I need to
> >> somehow dynamically add processing capability of the group A to the
> route
> >> and then perhaps if the messages from group A stop arriving for some
> time,
> >> that processor could be removed.
> >>
> >> How to add the parallel part between the group messages is also blurry
> to
> >> me. One way of doing this I was thinking was to do a multicast to all
> the
> >> dynamically created processing routes for groups and stick a filter
> before
> >> so that only messages from particular group can go through. From
> multicast
> >> page:
> >>
> >> from("direct:a").multicast().parallelProcessing().to("direct:x",
> >> "direct:y", "direct:z");
> >>
> >> But here the x,y,z endpoints are hardcoded. I could write up some custom
> >> multicast I suppose to search the routes in CamelContext...... not sure.
> >>
> >> Thanks
> >> Artur
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <
> ajablon...@ravenpack.com>
> >> wrote:
> >>
> >>> Hi Zoran,
> >>>
> >>> Thank you for such detailed response. This looks very promising. i will
> >>> need to get my head around the aggregator pattern.
> >>> For this week I will be busy with other tasks, but I will get back to
> it
> >>> as soon as I can to see if I can get Camel work for the use case.
> >>>
> >>> Cheerio
> >>> Artur
> >>>
> >>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com>
> wrote:
> >>>
> >>>> Hi Artur,
> >>>> I was thinking that the order of the messages would be important as
> >>>> you need to process them sequentially.
> >>>>
> >>>> So I think you could use the dynamic message routing[1] with
> >>>> aggregator[2], something like:
> >>>>
> >>>>     from("aws-sqs:...")
> >>>>         .process("#preProcess")
> >>>>         .toD("direct:${header.nextRoute}");
> >>>>
> >>>>     from("direct:parallel")...;
> >>>>     from("direct:sequential").aggregate(simple("${header.group}"
> >>>> )).completion..;
> >>>>
> >>>> So from yout SQS queue you would use a processor to pre-process
> >>>> message whose responsibility would be to set the (custom) `nextRoute`
> >>>> and (custom) `group` headers. `nextRoute` would be `parallel` or
> >>>> `sequential`, and if `sequential` the messages would be aggregated
> >>>> using the `group` header.
> >>>>
> >>>> You would want to define your own custom aggregation strategy or use
> >>>> the completion* options that are available to you. There also might be
> >>>> need to use seda[3] to fine tune any parallel processing. You might
> >>>> throw in there a data format unmarshaller[4] instead of the
> >>>> `preProcess` processor and use something like `${body.xyz} == foo` in
> >>>> the `toD` expression.
> >>>>
> >>>> And I would guess that you need to examine transactions or persistence
> >>>> at some point also in case your aggregation step runs for a long time
> >>>> or if your use case is sensitive to message loss if interrupted --
> >>>> which would undoubtedly lead you back to using queues to separate
> >>>> those two ways of processing,
> >>>>
> >>>> HTH,
> >>>>
> >>>> zoran
> >>>>
> >>>> [1] https://camel.apache.org/message-endpoint.html
> >>>> [2] https://camel.apache.org/aggregator2.html
> >>>> [3] https://camel.apache.org/seda.html
> >>>> [4] https://camel.apache.org/data-format.html
> >>>>
> >>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
> >>>> <ajablon...@ravenpack.com> wrote:
> >>>> > Hey Zoran.
> >>>> >
> >>>> > I read again the patterns you mentioned. In my use case the order of
> >>>> > processing within a group doesn't matter as long as two messages
> from
> >>>> the
> >>>> > same group are never processed in parallel. So i guess resenquencer
> is
> >>>> out
> >>>> > of the picture unless I didn't get the intention.
> >>>> >
> >>>> > So what we are left with is the content based router. Sure. The
> message
> >>>> > comes, i can see what group it belongs two... And what next? Perhaps
> >>>> it's
> >>>> > the very first message from that group so I would need to trigger
> >>>> creating
> >>>> > route/processor for that group somehow, perhaps messages from this
> >>>> group
> >>>> > were processed before in which case the processor for the group
> should
> >>>> > already exist...
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com>
> wrote:
> >>>> >
> >>>> >> Hi Artur,
> >>>> >> have a look at Camel EIP page[1], what you describe sounds to me
> like
> >>>> >> Resequencer and Content based router patterns,
> >>>> >>
> >>>> >> zoran
> >>>> >>
> >>>> >> [1] https://camel.apache.org/eip.html
> >>>> >>
> >>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
> >>>> >> <ajablon...@ravenpack.com> wrote:
> >>>> >> > Hello.
> >>>> >> >
> >>>> >> > I wonder if someone could push me in the right direction trying
> to
> >>>> >> express
> >>>> >> > quite curious case in Camel route.
> >>>> >> >
> >>>> >> > Imagine there's a stream of messages some of which can be
> processed
> >>>> in
> >>>> >> > parallel and some have to be processed serially. You can group
> the
> >>>> >> messages
> >>>> >> > that require serial processing together by looking at the message
> >>>> body.
> >>>> >> You
> >>>> >> > don't know upfront how many groups can occur in the stream.
> >>>> >> >
> >>>> >> > The way I thought about doing this is having a route for each
> >>>> message
> >>>> >> > group. Since I don't know upfront how many and what groups there
> >>>> will be
> >>>> >> > then I would need to create routes dynamically. If a message
> comes
> >>>> >> > belonging to a group that doesn't have it's handling route, then
> i
> >>>> could
> >>>> >> > create it (is that even possible??) Then if there's no messages
> >>>> coming
> >>>> >> for
> >>>> >> > a given group in some time I could remove the route for the
> group to
> >>>> >> > cleanup (is that possible?)
> >>>> >> >
> >>>> >> > New to Camel
> >>>> >> >
> >>>> >> > Thx!
> >>>> >> > Artur
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Zoran Regvart
> >>>> >>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Zoran Regvart
> >>>>
> >>>
> >>>
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Reply via email to