Hi Valdis, and your MQ is throttling how? Serializing it to disk? Or is it keeping it in RAM but running on a different machine?
It seems to be a bit of an overkill :) I just found ThrottlingInflightRoutePolicy, reading its description it seems to be what I'm looking for. I will double-check tomorrow. Thanks for all the tips so far. Valdis Andersons <valdis.anders...@vhi.ie> writes: > Hi Peter, > > Could you potentially use a message broker (RabbitMQ, ActiveMQ etc.) > downstream from the aggregator that is causing you concern? > > Mongo -> Aggregator -> Message Broker Queue -> Consumer -> ... -> Mongo > > With the consumer counts it's quite easy to restrict throughput to the max > your hardware can handle because consumers don't ask for a new message from a > queue unless they've done processing the previous one. > > We had to do something similar to some of our RESTful endpoints when some > nightly batches had the potential to fire in a huge amount of requests but > the downstream processors couldn't handle it. > It makes the route a bit more complex but we haven't had an out of memory > error since (still working fast enough for us). > > > Regards, > Valdis > > -----Original Message----- > From: Peter Nagy (Jr) [mailto:pn...@gratex.com] > Sent: 17 October 2018 15:09 > To: users@camel.apache.org > Subject: Re: throttling/backpressure on a route > > Hi Onder, > > Onder SEZGIN <ondersez...@gmail.com> writes: > >> Hi, >> I would suggest >> 1- Take a look at throttle-eip. (Make sure you understand split, >> aggregate and mongodb3 producer adocs around mongodb aggregation. Try >> and see your case unless you can explain or create a unit test to >> pinpoint your issue and also check mongodb3 components test case for >> better examples.) > > I looked at the throttle EIP before, unfortunately this doesn't seem to solve > the issue at hand. What if the producer is producing more than downstream can > manage? Memory will be blasted. The throttler cannot know how is downstream > doing. Downstream needs to tell upstream if it's OK to send more. > >> 2- All in memory. Depending on your route design, they are either in >> memoryaggregationrepository or flowing through downstream enpoints / routes. > > Understood, thanks. > >> 3- For your example, take a look at split, aggreagte, mongodb3 >> aggregate functionality and possibly, throttle or delay eips. > > As explained above none of these seem to solve the issue. There needs to be > communication between the producers. Having > > A -> B -> C -> D -> E -> F > > E needs to tell B "Wait" and when it catches up tell B "OK please send more". > Otherwise things will flow downstream, filling up memory. > > Surely constraining memory usage is a solved problem? I'm just too new to > understand what pattern(s) solve it. > >> >> >> On Tue, Oct 16, 2018 at 10:18 AM Peter Nagy (Jr) <pn...@gratex.com> wrote: >> >>> Newbie question incoming. >>> >>> I have a route that looks like >>> >>> ... >>> .setHeader("CamelMongoDbBatchSize", 128) >>> .to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterabl >>> e") >>> .split(constant(true)) >>> .streaming() >>> ... >>> .aggregate(constant(true), new GroupedBodyAggregationStrategy()) >>> .completionSize(128) >>> .to("mongodb3:myothermongo?...&operation=bulkWrite") >>> >>> Pardon if there's some typos, I wrote this by hand since I'm using >>> Camel from clojure and the clojure code looks a bit different. >>> >>> This works as expected, the aggregation is retrieved in batches and >>> sent downstream one-by-one, being processed with various Processors >>> and finally written in batches. >>> >>> I had a bug before where I had >>> >>> .aggregate().body().completionSize(128).completionTimeout(30000) >>> >>> which resulted in the aggregation waiting for the timeout. However in >>> the meantime the mongo aggregation query finished processing everything. >>> >>> This raised some questions for me. >>> >>> 1. My process needs to be memory-friendly. Even after fixing the bug >>> I need to be sure the aggregation query won't fill the RAM when >>> downstream can't keep up. How can I apply some backpressure? How can >>> I tell the route "don't process more until someone downstream says you >>> can"? E.g. >>> saying if there's more than N exchanges pending, wait. >>> >>> 2. Who was buffering the exchanges? I had ~2k entries flowing >>> through, where did they end up queued? >>> >>> 3. Did I miss a doc page where these questions are explained? I spent >>> a considerable amount of time searching for an answer thinking "This >>> must be a common requirement, surely there's an example somewhere". >>> >>> -- >>> To reach a goal one has to enjoy the journey. >>> -- To reach a goal one has to enjoy the journey.