I created a PR with the implementation described above — https://github.com/apache/flink/pull/7679. Please provide feedback :)
Thanks Lakshmi On Thu, Feb 7, 2019 at 11:04 AM Lakshmi Gururaja Rao <l...@lyft.com> wrote: > Apologies for the delay in responding here. > > The idea of making the Ratelimiter config/creation logic generic across > connectors makes sense to me. > > In the approach that we used and tested internally, we essentially created > a Guava RateLimiter > <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html> > within the *KafkaConsumerThread* with a desired rate and moved the > *consumer.poll() > *into a separate method that uses the bytes received from every call to > *poll() > *to control the rate (i.e. as a parameter to the *acquire()* call). We > have abstracted out the RateLimiting configuration and creation into a > RateLimiterFactory to make it re-usable for other connectors. > > I also added some of the results we got from testing this approach on the > FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll > share a PR with the approach shortly and hopefully we can use that as a > starting point to discuss this feature further. > > Thanks > Lakshmi > > On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <becket....@gmail.com> wrote: > >> Hi Thomas, >> >> Yes, adding a rate limiting operator in front of the sink would work for >> record rate limiting. >> >> Another thing I am thinking is that for local throttling, it seems that >> throttling in sources and sinks has some subtle differences. For example, >> consider both source and sink as HDFS. For sources, rate limiter in each >> task could work independently without a problem, the total throughput will >> be throttled at PARALLELISM * PER_TASK_THRESHOLD. >> >> On the sink side it might be a little different. After some aggregations, >> the data might become skewed. In that case, some sink tasks with hot keys >> may hit the rate limit and create back pressure, while the other sink >> tasks >> are pretty idle. This could result in over-throttling. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <t...@apache.org> wrote: >> >> > Hi Becket, >> > >> > The throttling operator would suffer from the same issue of not being >> able >> > to accurately count bytes. >> > >> > On the other hand, it can be used by composition w/o modifying existing >> > operators. >> > >> > As for sinks, wouldn't an operator that adjusts the rate in front of the >> > sink suffice? >> > >> > Thomas >> > >> > >> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > Hi Thomas, >> > > >> > > Good point about counting bytes. It would be difficult to throttle the >> > byte >> > > rate with the existing API. And it seems that for sinks we have to do >> > that >> > > rate limiting in the sink implementation anyways. There are a few >> ways to >> > > do some abstraction, but maybe adding a RateLimiter is trivial enough >> so >> > we >> > > don't need to worry about reusing the throttling logic. >> > > >> > > But in any case, let's make sure the throttling threshold >> configuration >> > > names are the same for all the Source and Sinks. So the config parsing >> > > logic should probably still be put together in place. That is probably >> > some >> > > implementation details we can discuss when review the patch. >> > > >> > > I am not sure about adding another throttling operator. How would that >> > > operator get the serialized size if it is downstream of a source. And >> how >> > > would that work on the sink side? >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <t...@apache.org> wrote: >> > > >> > > > I initially thought of an approach similar to the collector idea, by >> > > > overriding emitRecord in the fetcher. That makes counting the bytes >> > > > difficult, because it's downstream of decoding. >> > > > >> > > > Another idea of solving this in a reusable way was to have a >> separate >> > > rate >> > > > limiting operator chained downstream of the consumer, which would >> > develop >> > > > back pressure and slow down the consumer. However, that would >> interfere >> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint >> barrier >> > > will >> > > > also be stuck in the backlog)? >> > > > >> > > > Thomas >> > > > >> > > > >> > > > >> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < >> > kkrugler_li...@transpac.com >> > > > >> > > > wrote: >> > > > >> > > > > +1, and something I was planning to comment on in the Jira issue. >> > > > > >> > > > > Also, if rate limiting could effectively stop the stream, then >> this >> > > could >> > > > > be used solve a common data enrichment issue. >> > > > > >> > > > > Logically you want to pause one stream (typically the time series >> > data >> > > > > being processed) while another stream (typically the broadcast) is >> > > > > broadcasting an update to enrichment data. >> > > > > >> > > > > Currently you have to buffer the time series data in your >> enrichment >> > > > > function, but if the rate limiter was pluggable, it could detect >> when >> > > > this >> > > > > enrichment update was happening. >> > > > > >> > > > > — Ken >> > > > > >> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com> >> > > wrote: >> > > > > > >> > > > > > Hi Jamie, >> > > > > > >> > > > > > Thanks for the explanation. That makes sense to me. I am >> wondering >> > if >> > > > > there >> > > > > > is a more general way to add a rate limiter to all the >> connecters >> > > > rather >> > > > > > than doing that for each individual one. For example, maybe we >> can >> > > have >> > > > > the >> > > > > > rate limiting logic in the Collector / Output, thus all the >> > > connectors >> > > > > > (even operators?) could be rate limited. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Jiangjie (Becket) Qin >> > > > > > >> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao >> > > > > <l...@lyft.com.invalid> >> > > > > > wrote: >> > > > > > >> > > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com> >> . >> > > > > >> >> > > > > >> JIRA for this feature: >> > > > > https://issues.apache.org/jira/browse/FLINK-11501. >> > > > > >> >> > > > > >> Thanks >> > > > > >> Lakshmi >> > > > > >> >> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org> >> > > wrote: >> > > > > >> >> > > > > >>> I think it would be reasonable to have a rate limiter option >> in >> > the >> > > > > >>> consumer, given that others have also looked to solve this. >> > > > > >>> >> > > > > >>> I think for this and other optional features, it would be >> good to >> > > > > >> implement >> > > > > >>> in a way that overrides are possible. Someone else may want >> to do >> > > the >> > > > > >>> limiting differently, taking into account more/other factors. >> > > > > >>> >> > > > > >>> Both, adding the limiter and making the consumer code more >> > > adoptable >> > > > > >> could >> > > > > >>> be split into separate work also. >> > > > > >>> >> > > > > >>> BTW is there a JIRA for this? >> > > > > >>> >> > > > > >>> Thomas >> > > > > >>> >> > > > > >> >> > > > > >> >> > > > > >> -- >> > > > > >> *Lakshmi Gururaja Rao* >> > > > > >> SWE >> > > > > >> 217.778.7218 <+12177787218> >> > > > > >> [image: Lyft] <http://www.lyft.com/> >> > > > > >> >> > > > > >> > > > > -------------------------- >> > > > > Ken Krugler >> > > > > +1 530-210-6378 >> > > > > http://www.scaleunlimited.com >> > > > > Custom big data solutions & training >> > > > > Flink, Solr, Hadoop, Cascading & Cassandra >> > > > > >> > > > > >> > > > >> > > >> > >> > > > -- > *Lakshmi Gururaja Rao* > SWE > 217.778.7218 <+12177787218> > [image: Lyft] <http://www.lyft.com/> > -- *Lakshmi Gururaja Rao* SWE 217.778.7218 <+12177787218> [image: Lyft] <http://www.lyft.com/>