Hi Ryanne, Thanks for the KIP! Sorry for the delay, I finally took some time to take a look and I have a few questions:
1) record.rate.limit, record.batch.rate.limit are listed as Connector configurations. If I understand correctly they are actually configurations of RecordRateLimiter, and RecordBatchRateLimiter respectively. So if you set rate.limiters to MyRateLimiter you would not have to set these 2 configs. 2) The motivation mentions the need to "limit total throughput". The 2 proposed RateLimiter implementations seems to work on the number of records flowing, not on the actually byte rate. Should we have a ByteRateLimiter? 3) Do you know what happens if multiple RateLimiters are enabled and both hit their limit but return a different throttle time? Is the longest throttle time selected? 4) I tend to agree with Tom that reusing the transformation/config provider syntax for configurations may be nicer for consistency. While the built in RateLimiter have few configurations, a custom implementation may have several and may conflict with configurations from a connector. 5) Can you explain how ConfigDef config(); is used? Is this method needed? 6) Is the rate applied before transformations? For example if a transformation filters records, are they still counted? Thanks, Mickael On Fri, Jun 4, 2021 at 9:23 PM Ryanne Dolan <[email protected]> wrote: > > Hey Tom, thanks for taking a look. > > > It's a bit weird that there's a separate start(Time) method > > Good call, I think we can use a second constructor instead. > > > No metrics for batch rates? > > Good call. TBH I assumed there would already be put/poll rates, but looking > again I don't see them. Will add to the KIP. > > > I think it might be nicer to have a consistent configuration mechanism > > I had previously implemented this as you propose (same as SMTs), but found > it to be a little heavy for the common use-cases. I didn't like how users > needed to specify the classnames in order to use the built-in rate limiters. > > But thinking again about this, if we include default values for > rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type, we'd > get the same effect. Namely, most users would just need to > specify rate.limiter.record.limit or rate.limiter.batch.limit. > > So I think you're right -- the common use-cases don't necessarily suffer, > and custom rate limiters would definitely benefit. I'll fix. > > > hard.rate.limiters [..vs..] rate.limiters > > I think the difference may be immaterial. As implemented currently, > RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they > don't define a window of time in which a max number of records or batches > can be processed. Instead, they just tap the breaks when the instantaneous > rate is observed to be too high. But a "hard" rate limiter could be > implemented with the same interface, e.g. by sleeping until the end of the > current window. > > Ryanne > > On Fri, May 21, 2021 at 7:10 AM Tom Bentley <[email protected]> wrote: > > > Hi Ryanne, > > > > Thanks for the KIP. I can see this would be useful. > > > > 1. Can you elaborate on the life cycle of the RateLimiter interface (in the > > Javadoc)? In particular it's not clear to me how calls to accumulate() and > > throttleTime() can be interleaved (I assume arbitrarily). > > > > 2. It's a bit weird that there's a separate start(Time) method in addition > > to the configure() inherited from Configurable. Perhaps passing the Time to > > accumulate() would be simpler than needing a two stage configuration step, > > even if it would be the same instance on every call. If start() really is > > needed you should document that it's called after configure(). > > > > 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to > > avoid any ambiguity about how the result is interpreted? > > > > 4. The metrics: Are they windowed over some time period, if so, what? > > > > 5. No metrics for batch rates? > > > > 6. It doesn't seem to be stated, but I assume the throttle time used is the > > maximum of the throttleTime() returned by all the limiters. > > > > 7. The configuration uses a different mechanism than for SMTs and also > > requires to add three common configs (with a risk of collision with any > > connector which already defines configs with these names). I think it might > > be nicer to have a consistent configuration mechanism, so for example > > rate.limiters=record,batch > > rate.limiter.record.type=RecordRateLimiter > > rate.limiter.record.limit=123 > > rate.limiter.batch.type=RecordBatchRateLimiter > > rate.limiter.batch.limit=456 > > This means there's only a single new common config, as the others depend on > > the aliases used, so further collisions can be avoided. > > > > 8. A cluster where every connector has a quota could end up being > > underutilised, yet a subset of connectors could be running at their limit. > > While this makes sense for the firehose problem it seems to be problematic > > for the noisy neighbour case, where the spare capacity could be shared > > between all the throttled tasks on the worker. While I'm not suggesting you > > need to implement this as part of the KIP, maybe the API could accommodate > > it being added later. Perhaps this could be as simple as using > > hard.rate.limiters rather than just rate.limiters, so that > > soft.rate.limiters could be added later, though maybe there are use cases > > where a single limiter needs to supply both soft and hard limits. > > > > Thanks again, > > > > Tom > > > > On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <[email protected]> > > wrote: > > > > > Hey y'all, I've expanded the scope of this KIP slightly to include a > > > pluggable interface, RateLimiter. > > > > > > After implementing this a few different ways, it's clear that the > > > configuration story is actually simpler with a pluggable model. > > > Out-of-the-box, we have just two configuration properties to tweak: > > > record.rate.limit and record.batch.rate.limit (subj to change ofc). These > > > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter > > > impls. > > > > > > From there, additional custom RateLimiters can be enabled with whatever > > > configuration they need. This is essentially the same pattern taken with > > > MetricsReporters and others. > > > > > > I had originally envisioned that the set of built-in limits would expand > > > over time, eg individual put/poll/commit/flush limits. However, these can > > > all be throttled adequately with the proposed API by limiting overall > > > record and batch thruput. > > > > > > Please let me know what you think. The voting thread is open. > > > > > > Ryanne > > > > > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <[email protected]> wrote: > > > > > > > Hey y'all, I'd like to draw you attention to a new KIP: > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect > > > > > > > > Lemme know what you think. Thanks! > > > > > > > > Ryanne > > > > > > > > >
