Hi Valdis,

and your MQ is throttling how? Serializing it to disk? Or is it keeping
it in RAM but running on a different machine?

It seems to be a bit of an overkill :)


I just found ThrottlingInflightRoutePolicy, reading its description it
seems to be what I'm looking for. I will double-check tomorrow.

Thanks for all the tips so far.

Valdis Andersons <valdis.anders...@vhi.ie> writes:

> Hi Peter,
>
> Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) 
> downstream from the aggregator that is causing you concern?
>
> Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo
>
> With the consumer counts it's quite easy to restrict throughput to the max 
> your hardware can handle because consumers don't ask for a new message from a 
> queue unless they've done processing the previous one.
>
> We had to do something similar to some of our RESTful endpoints when some 
> nightly batches had the potential to fire in a huge amount of requests but 
> the downstream processors couldn't handle it.
> It makes the route a bit more complex but we haven't had an out of memory 
> error since (still working fast enough for us).
>
>
> Regards,
> Valdis
>
> -----Original Message-----
> From: Peter Nagy (Jr) [mailto:pn...@gratex.com]
> Sent: 17 October 2018 15:09
> To: users@camel.apache.org
> Subject: Re: throttling/backpressure on a route
>
> Hi Onder,
>
> Onder SEZGIN <ondersez...@gmail.com> writes:
>
>> Hi,
>> I would suggest
>> 1- Take a look at throttle-eip. (Make sure you understand split,
>> aggregate and mongodb3 producer adocs around mongodb aggregation. Try
>> and see your case unless you can explain or create a unit test to
>> pinpoint your issue and also check mongodb3 components test case for
>> better examples.)
>
> I looked at the throttle EIP before, unfortunately this doesn't seem to solve 
> the issue at hand. What if the producer is producing more than downstream can 
> manage? Memory will be blasted. The throttler cannot know how is downstream 
> doing. Downstream needs to tell upstream if it's OK to send more.
>
>> 2- All in memory. Depending on your route design, they are either in
>> memoryaggregationrepository or flowing through downstream enpoints / routes.
>
> Understood, thanks.
>
>> 3- For your example, take a look at split, aggreagte, mongodb3
>> aggregate functionality and possibly, throttle or delay eips.
>
> As explained above none of these seem to solve the issue. There needs to be 
> communication between the producers. Having
>
> A -> B -> C -> D -> E -> F
>
> E needs to tell B "Wait" and when it catches up tell B "OK please send more". 
> Otherwise things will flow downstream, filling up memory.
>
> Surely constraining memory usage is a solved problem? I'm just too new to 
> understand what pattern(s) solve it.
>
>>
>>
>> On Tue, Oct 16, 2018 at 10:18 AM Peter Nagy (Jr) <pn...@gratex.com> wrote:
>>
>>> Newbie question incoming.
>>>
>>> I have a route that looks like
>>>
>>> ...
>>> .setHeader("CamelMongoDbBatchSize", 128)
>>> .to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterabl
>>> e")
>>> .split(constant(true))
>>> .streaming()
>>> ...
>>> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
>>> .completionSize(128)
>>> .to("mongodb3:myothermongo?...&operation=bulkWrite")
>>>
>>> Pardon if there's some typos, I wrote this by hand since I'm using
>>> Camel from clojure and the clojure code looks a bit different.
>>>
>>> This works as expected, the aggregation is retrieved in batches and
>>> sent downstream one-by-one, being processed with various Processors
>>> and finally written in batches.
>>>
>>> I had a bug before where I had
>>>
>>> .aggregate().body().completionSize(128).completionTimeout(30000)
>>>
>>> which resulted in the aggregation waiting for the timeout. However in
>>> the meantime the mongo aggregation query finished processing everything.
>>>
>>> This raised some questions for me.
>>>
>>> 1. My process needs to be memory-friendly. Even after fixing the bug
>>> I need to be sure the aggregation query won't fill the RAM when
>>> downstream can't keep up. How can I apply some backpressure? How can
>>> I tell the route "don't process more until someone downstream says you 
>>> can"? E.g.
>>> saying if there's more than N exchanges pending, wait.
>>>
>>> 2. Who was buffering the exchanges? I had ~2k entries flowing
>>> through, where did they end up queued?
>>>
>>> 3. Did I miss a doc page where these questions are explained? I spent
>>> a considerable amount of time searching for an answer thinking "This
>>> must be a common requirement, surely there's an example somewhere".
>>>
>>> --
>>> To reach a goal one has to enjoy the journey.
>>>


--
To reach a goal one has to enjoy the journey.

Reply via email to