I created a PR with the implementation described above —
https://github.com/apache/flink/pull/7679. Please provide feedback :)

Thanks
Lakshmi

On Thu, Feb 7, 2019 at 11:04 AM Lakshmi Gururaja Rao <l...@lyft.com> wrote:

> Apologies for the delay in responding here.
>
> The idea of making the Ratelimiter config/creation logic generic across
> connectors makes sense to me.
>
> In the approach that we used and tested internally, we essentially created
> a Guava RateLimiter
> <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html>
> within the *KafkaConsumerThread* with a desired rate and moved the 
> *consumer.poll()
> *into a separate method that uses the bytes received from every call to 
> *poll()
> *to control the rate (i.e. as a parameter to the *acquire()* call). We
> have abstracted out the RateLimiting configuration and creation into a
> RateLimiterFactory to make it re-usable for other connectors.
>
> I also added some of the results we got from testing this approach on the
> FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll
> share a PR with the approach shortly and hopefully we can use that as a
> starting point to discuss this feature further.
>
> Thanks
> Lakshmi
>
> On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Thomas,
>>
>> Yes, adding a rate limiting operator in front of the sink would work for
>> record rate limiting.
>>
>> Another thing I am thinking is that for local throttling, it seems that
>> throttling in sources and sinks has some subtle differences. For example,
>> consider both source and sink as HDFS. For sources, rate limiter in each
>> task could work independently without a problem, the total throughput will
>> be throttled at PARALLELISM * PER_TASK_THRESHOLD.
>>
>> On the sink side it might be a little different. After some aggregations,
>> the data might become skewed. In that case, some sink tasks with hot keys
>> may hit the rate limit and create back pressure, while the other sink
>> tasks
>> are pretty idle. This could result in over-throttling.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <t...@apache.org> wrote:
>>
>> > Hi Becket,
>> >
>> > The throttling operator would suffer from the same issue of not being
>> able
>> > to accurately count bytes.
>> >
>> > On the other hand, it can be used by composition w/o modifying existing
>> > operators.
>> >
>> > As for sinks, wouldn't an operator that adjusts the rate in front of the
>> > sink suffice?
>> >
>> > Thomas
>> >
>> >
>> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Good point about counting bytes. It would be difficult to throttle the
>> > byte
>> > > rate with the existing API. And it seems that for sinks we have to do
>> > that
>> > > rate limiting in the sink implementation anyways. There are a few
>> ways to
>> > > do some abstraction, but maybe adding a RateLimiter is trivial enough
>> so
>> > we
>> > > don't need to worry about reusing the throttling logic.
>> > >
>> > > But in any case, let's make sure the throttling threshold
>> configuration
>> > > names are the same for all the Source and Sinks. So the config parsing
>> > > logic should probably still be put together in place. That is probably
>> > some
>> > > implementation details we can discuss when review the patch.
>> > >
>> > > I am not sure about adding another throttling operator. How would that
>> > > operator get the serialized size if it is downstream of a source. And
>> how
>> > > would that work on the sink side?
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <t...@apache.org> wrote:
>> > >
>> > > > I initially thought of an approach similar to the collector idea, by
>> > > > overriding emitRecord in the fetcher. That makes counting the bytes
>> > > > difficult, because it's downstream of decoding.
>> > > >
>> > > > Another idea of solving this in a reusable way was to have a
>> separate
>> > > rate
>> > > > limiting operator chained downstream of the consumer, which would
>> > develop
>> > > > back pressure and slow down the consumer. However, that would
>> interfere
>> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint
>> barrier
>> > > will
>> > > > also be stuck in the backlog)?
>> > > >
>> > > > Thomas
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <
>> > kkrugler_li...@transpac.com
>> > > >
>> > > > wrote:
>> > > >
>> > > > > +1, and something I was planning to comment on in the Jira issue.
>> > > > >
>> > > > > Also, if rate limiting could effectively stop the stream, then
>> this
>> > > could
>> > > > > be used solve a common data enrichment issue.
>> > > > >
>> > > > > Logically you want to pause one stream (typically the time series
>> > data
>> > > > > being processed) while another stream (typically the broadcast) is
>> > > > > broadcasting an update to enrichment data.
>> > > > >
>> > > > > Currently you have to buffer the time series data in your
>> enrichment
>> > > > > function, but if the rate limiter was pluggable, it could detect
>> when
>> > > > this
>> > > > > enrichment update was happening.
>> > > > >
>> > > > >  — Ken
>> > > > >
>> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > Hi Jamie,
>> > > > > >
>> > > > > > Thanks for the explanation. That makes sense to me. I am
>> wondering
>> > if
>> > > > > there
>> > > > > > is a more general way to add a rate limiter to all the
>> connecters
>> > > > rather
>> > > > > > than doing that for each individual one. For example, maybe we
>> can
>> > > have
>> > > > > the
>> > > > > > rate limiting logic in the Collector / Output, thus all the
>> > > connectors
>> > > > > > (even operators?) could be rate limited.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jiangjie (Becket) Qin
>> > > > > >
>> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
>> > > > > <l...@lyft.com.invalid>
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com>
>> .
>> > > > > >>
>> > > > > >> JIRA for this feature:
>> > > > > https://issues.apache.org/jira/browse/FLINK-11501.
>> > > > > >>
>> > > > > >> Thanks
>> > > > > >> Lakshmi
>> > > > > >>
>> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org>
>> > > wrote:
>> > > > > >>
>> > > > > >>> I think it would be reasonable to have a rate limiter option
>> in
>> > the
>> > > > > >>> consumer, given that others have also looked to solve this.
>> > > > > >>>
>> > > > > >>> I think for this and other optional features, it would be
>> good to
>> > > > > >> implement
>> > > > > >>> in a way that overrides are possible. Someone else may want
>> to do
>> > > the
>> > > > > >>> limiting differently, taking into account more/other factors.
>> > > > > >>>
>> > > > > >>> Both, adding the limiter and making the consumer code more
>> > > adoptable
>> > > > > >> could
>> > > > > >>> be split into separate work also.
>> > > > > >>>
>> > > > > >>> BTW is there a JIRA for this?
>> > > > > >>>
>> > > > > >>> Thomas
>> > > > > >>>
>> > > > > >>
>> > > > > >>
>> > > > > >> --
>> > > > > >> *Lakshmi Gururaja Rao*
>> > > > > >> SWE
>> > > > > >> 217.778.7218 <+12177787218>
>> > > > > >> [image: Lyft] <http://www.lyft.com/>
>> > > > > >>
>> > > > >
>> > > > > --------------------------
>> > > > > Ken Krugler
>> > > > > +1 530-210-6378
>> > > > > http://www.scaleunlimited.com
>> > > > > Custom big data solutions & training
>> > > > > Flink, Solr, Hadoop, Cascading & Cassandra
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
> --
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>
>


-- 
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>

Reply via email to