Hi Peter,

Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) downstream 
from the aggregator that is causing you concern?

Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo

With the consumer counts it's quite easy to restrict throughput to the max your 
hardware can handle because consumers don't ask for a new message from a queue 
unless they've done processing the previous one.

We had to do something similar to some of our RESTful endpoints when some 
nightly batches had the potential to fire in a huge amount of requests but the 
downstream processors couldn't handle it. 
It makes the route a bit more complex but we haven't had an out of memory error 
since (still working fast enough for us).


Regards,
Valdis

-----Original Message-----
From: Peter Nagy (Jr) [mailto:pn...@gratex.com] 
Sent: 17 October 2018 15:09
To: users@camel.apache.org
Subject: Re: throttling/backpressure on a route

Hi Onder,

Onder SEZGIN <ondersez...@gmail.com> writes:

> Hi,
> I would suggest
> 1- Take a look at throttle-eip. (Make sure you understand split, 
> aggregate and mongodb3 producer adocs around mongodb aggregation. Try 
> and see your case unless you can explain or create a unit test to 
> pinpoint your issue and also check mongodb3 components test case for 
> better examples.)

I looked at the throttle EIP before, unfortunately this doesn't seem to solve 
the issue at hand. What if the producer is producing more than downstream can 
manage? Memory will be blasted. The throttler cannot know how is downstream 
doing. Downstream needs to tell upstream if it's OK to send more.

> 2- All in memory. Depending on your route design, they are either in 
> memoryaggregationrepository or flowing through downstream enpoints / routes.

Understood, thanks.

> 3- For your example, take a look at split, aggreagte, mongodb3 
> aggregate functionality and possibly, throttle or delay eips.

As explained above none of these seem to solve the issue. There needs to be 
communication between the producers. Having

A -> B -> C -> D -> E -> F

E needs to tell B "Wait" and when it catches up tell B "OK please send more". 
Otherwise things will flow downstream, filling up memory.

Surely constraining memory usage is a solved problem? I'm just too new to 
understand what pattern(s) solve it.

>
>
> On Tue, Oct 16, 2018 at 10:18 AM Peter Nagy (Jr) <pn...@gratex.com> wrote:
>
>> Newbie question incoming.
>>
>> I have a route that looks like
>>
>> ...
>> .setHeader("CamelMongoDbBatchSize", 128)
>> .to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterabl
>> e")
>> .split(constant(true))
>> .streaming()
>> ...
>> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
>> .completionSize(128)
>> .to("mongodb3:myothermongo?...&operation=bulkWrite")
>>
>> Pardon if there's some typos, I wrote this by hand since I'm using 
>> Camel from clojure and the clojure code looks a bit different.
>>
>> This works as expected, the aggregation is retrieved in batches and 
>> sent downstream one-by-one, being processed with various Processors 
>> and finally written in batches.
>>
>> I had a bug before where I had
>>
>> .aggregate().body().completionSize(128).completionTimeout(30000)
>>
>> which resulted in the aggregation waiting for the timeout. However in 
>> the meantime the mongo aggregation query finished processing everything.
>>
>> This raised some questions for me.
>>
>> 1. My process needs to be memory-friendly. Even after fixing the bug 
>> I need to be sure the aggregation query won't fill the RAM when 
>> downstream can't keep up. How can I apply some backpressure? How can 
>> I tell the route "don't process more until someone downstream says you can"? 
>> E.g.
>> saying if there's more than N exchanges pending, wait.
>>
>> 2. Who was buffering the exchanges? I had ~2k entries flowing 
>> through, where did they end up queued?
>>
>> 3. Did I miss a doc page where these questions are explained? I spent 
>> a considerable amount of time searching for an answer thinking "This 
>> must be a common requirement, surely there's an example somewhere".
>>
>> --
>> To reach a goal one has to enjoy the journey.
>>


--
To reach a goal one has to enjoy the journey.
Vhi Group DAC (Vhi) is a holding company for insurance and healthcare services, 
which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC 
and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi Healthcare and Vhi 
Insurance DAC trading as Vhi Insurance are regulated by the Central Bank of 
Ireland. Vhi Healthcare is tied to Vhi Insurance DAC for health insurance in 
Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to 
Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage 
Protection which is underwritten by Zurich Life Assurance plc. Vhi Healthcare 
is tied to Collinson Insurance Services Limited for MultiTrip Travel Insurance, 
Backpacker Travel Insurance and Vhi Dental Insurance which are underwritten by 
Great Lakes Insurance SE, UK branch and for Vhi Canada Cover and Vhi 
International Health Insurance which are underwritten by Astrenska Insurance 
Limited. For more information about the Vhi Group please go to: 
https://www.vhi.ie/about-vhi.


Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí 
árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, Vhi 
Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. Déanann Banc 
Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil dó mar Vhi 
Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi 
Healthcare ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in 
Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de chuid Vhi 
agus Árachas Cosanta Morgáiste de chuid Vhi atá frithgheallta ag Zurich Life 
Assurance plc. Tá Vhi Healthcare ceangailte le Collinson Insurance Services 
Limited le haghaidh Árachas Taistil Ilturais agus Turasóirí Mála Droma agus 
Árachas Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes Insurance 
SE, UK branch agus le haghaidh Clúdach Cheanada de chuid Vhi agus Árachas 
Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta ag Astrenska Insurance 
Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar: 
https://www.vhi.ie/about-vhi.

This e-mail and any files transmitted with it contain information which may be 
confidential and which may also be privileged and is intended solely for the 
use of the individual or entity to whom it is addressed. Unless you are the 
intended recipient you may not copy or use it, or disclose it to anyone else. 
Any opinions expressed are that of the individual and not necessarily that of 
the Vhi Group. If you have received this e-mail in error please notify the 
sender by return.







Reply via email to