Hi Ryanne,

Thanks for the KIP! Sorry for the delay, I finally took some time to
take a look and I have a few questions:

1) record.rate.limit, record.batch.rate.limit are listed as Connector
configurations. If I understand correctly they are actually
configurations of RecordRateLimiter, and RecordBatchRateLimiter
respectively. So if you set rate.limiters to MyRateLimiter you would
not have to set these 2 configs.

2) The motivation mentions the need to "limit total throughput". The 2
proposed RateLimiter implementations seems to work on the number of
records flowing, not on the actually byte rate. Should we have a
ByteRateLimiter?

3) Do you know what happens if multiple RateLimiters are enabled and
both hit their limit but return a different throttle time? Is the
longest throttle time selected?

4) I tend to agree with Tom that reusing the transformation/config
provider syntax for configurations may be nicer for consistency. While
the built in RateLimiter have few configurations, a custom
implementation may have several and may conflict with configurations
from a connector.

5) Can you explain how ConfigDef config(); is used? Is this method needed?

6) Is the rate applied before transformations? For example if a
transformation filters records, are they still counted?

Thanks,
Mickael

On Fri, Jun 4, 2021 at 9:23 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
> Hey Tom, thanks for taking a look.
>
> > It's a bit weird that there's a separate start(Time) method
>
> Good call, I think we can use a second constructor instead.
>
> > No metrics for batch rates?
>
> Good call. TBH I assumed there would already be put/poll rates, but looking
> again I don't see them. Will add to the KIP.
>
> > I think it might be nicer to have a consistent configuration mechanism
>
> I had previously implemented this as you propose (same as SMTs), but found
> it to be a little heavy for the common use-cases. I didn't like how users
> needed to specify the classnames in order to use the built-in rate limiters.
>
> But thinking again about this, if we include default values for
> rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type, we'd
> get the same effect. Namely, most users would just need to
> specify rate.limiter.record.limit or rate.limiter.batch.limit.
>
> So I think you're right -- the common use-cases don't necessarily suffer,
> and custom rate limiters would definitely benefit. I'll fix.
>
> > hard.rate.limiters [..vs..] rate.limiters
>
> I think the difference may be immaterial. As implemented currently,
> RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
> don't define a window of time in which a max number of records or batches
> can be processed. Instead, they just tap the breaks when the instantaneous
> rate is observed to be too high. But a "hard" rate limiter could be
> implemented with the same interface, e.g. by sleeping until the end of the
> current window.
>
> Ryanne
>
> On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tbent...@redhat.com> wrote:
>
> > Hi Ryanne,
> >
> > Thanks for the KIP. I can see this would be useful.
> >
> > 1. Can you elaborate on the life cycle of the RateLimiter interface (in the
> > Javadoc)? In particular it's not clear to me how calls to accumulate() and
> > throttleTime() can be interleaved (I assume arbitrarily).
> >
> > 2. It's a bit weird that there's a separate start(Time) method in addition
> > to the configure() inherited from Configurable. Perhaps passing the Time to
> > accumulate() would be simpler than needing a two stage configuration step,
> > even if it would be the same instance on every call. If start() really is
> > needed you should document that it's called after configure().
> >
> > 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to
> > avoid any ambiguity about how the result is interpreted?
> >
> > 4. The metrics: Are they windowed over some time period, if so, what?
> >
> > 5. No metrics for batch rates?
> >
> > 6. It doesn't seem to be stated, but I assume the throttle time used is the
> > maximum of the throttleTime() returned by all the limiters.
> >
> > 7. The configuration uses a different mechanism than for SMTs and also
> > requires to add three common configs (with a risk of collision with any
> > connector which already defines configs with these names). I think it might
> > be nicer to have a consistent configuration mechanism, so for example
> >   rate.limiters=record,batch
> >   rate.limiter.record.type=RecordRateLimiter
> >   rate.limiter.record.limit=123
> >   rate.limiter.batch.type=RecordBatchRateLimiter
> >   rate.limiter.batch.limit=456
> > This means there's only a single new common config, as the others depend on
> > the aliases used, so further collisions can be avoided.
> >
> > 8. A cluster where every connector has a quota could end up being
> > underutilised, yet a subset of connectors could be running at their limit.
> > While this makes sense for the firehose problem it seems to be problematic
> > for the noisy neighbour case, where the spare capacity could be shared
> > between all the throttled tasks on the worker. While I'm not suggesting you
> > need to implement this as part of the KIP, maybe the API could accommodate
> > it being added later. Perhaps this could be as simple as using
> > hard.rate.limiters rather than just rate.limiters, so that
> > soft.rate.limiters could be added later, though maybe there are use cases
> > where a single limiter needs to supply both soft and hard limits.
> >
> > Thanks again,
> >
> > Tom
> >
> > On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ryannedo...@gmail.com>
> > wrote:
> >
> > > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > > pluggable interface, RateLimiter.
> > >
> > > After implementing this a few different ways, it's clear that the
> > > configuration story is actually simpler with a pluggable model.
> > > Out-of-the-box, we have just two configuration properties to tweak:
> > > record.rate.limit and record.batch.rate.limit (subj to change ofc). These
> > > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > > impls.
> > >
> > > From there, additional custom RateLimiters can be enabled with whatever
> > > configuration they need. This is essentially the same pattern taken with
> > > MetricsReporters and others.
> > >
> > > I had originally envisioned that the set of built-in limits would expand
> > > over time, eg individual put/poll/commit/flush limits. However, these can
> > > all be throttled adequately with the proposed API by limiting overall
> > > record and batch thruput.
> > >
> > > Please let me know what you think. The voting thread is open.
> > >
> > > Ryanne
> > >
> > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
> > >
> > > > Hey y'all, I'd like to draw you attention to a new KIP:
> > > >
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > > >
> > > > Lemme know what you think. Thanks!
> > > >
> > > > Ryanne
> > > >
> > >
> >

Reply via email to