divijvaidya opened a new pull request, #12045: URL: https://github.com/apache/kafka/pull/12045
## Why does the test fail? `ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()` sends 600 connection creation requests at a rate of 40/s with a listenerQuotaConnectionRateLimit set to 30/s. The test asserts that even though the rate of requests is higher than threshold, due to correct throttling, the measured rate at the completion of 600 requests is 30 +- epsilon. The value of epsilon is set to 7 which is exceeded from time to time leading to test failures. ## The problem Currently, calculate of rate function (used for rate limiting) holds the following assumptions: 1. Start time of a new sample window is time at which first event in that window is recorded. 2. If we don't have `quota.window.num` windows retained, assume that we have prior windows with zero records while calculating the rate. These assumptions lead to wrong calculation of rate in certain scenario as described below. Consider a scenario when we have some initial requests, followed by a small gap of requests and then another bunch of requests. More specifically: Configuration = [quota.window.size.seconds](https://kafka.apache.org/documentation/#brokerconfigs_quota.window.size.seconds)= 1s [quota.window.num](https://kafka.apache.org/documentation/#brokerconfigs_quota.window.num) = 2 listenerName.[max.connection.creation.rate](https://kafka.apache.org/documentation/#brokerconfigs_max.connection.creation.rate) = 30/s` Record events (E) at timestamps: E1 | CurrentTimeStamp (T1) | Window#1 (start time = T1) E2 | T2 = T1 + 30ms | Window#1 E3 | T3 = T1 + 995ms | Window#1 < No events from T3 to T4 > E4 | T4 = T1 + 1020ms | Window#2 (start time = T1 + 1020ms) E5 | T5 = T1 + 2010ms | Window#2 Rate calculated as per current implementation: Rate at T1 = 1 / (length of hypothetical prior samples + time elapsed for current sample) = 1 / (1 + 0) = 1 events per second Rate at T2 = 2 / (1 + 0.030) = 1.94 events per second Rate at T3 = 3/ (1 + 0.995) = 1.5 events per second Rate at T4 = 4/ (now - start time of oldest window) = 4 / 1.02 = 3.92 events per second When calculating rate for T5, first "obsolete windows" are purged, i.e. any window with start time < T5 - (quota.window.size.seconds * quota.window.num), thus, Window#1 is purged (because T1 < T5-2s) Rate at T5 = 2/ (length of hypothetical prior samples + time elapsed for current sample) = 2 / 1.99 = 1.005 events per second **Note how the rate calculation for T5 has fallen back to using the assumption that there exists prior windows with zero events (due to purge) whereas we do actually have a previous window with > 0 events in it. Hence, rate calculated at T5 is incorrect.** In worst case scenarios Window#1 could have large number of events in it but calculation of rate towards end of Window#2 would ignore all those earlier events leading to an incorrect low value of current rate. For throttling use cases, this would lead to allowing more events (since current observed rate is low) and thus, violating the contract to maintain a sustained `max.connection.creation.rate` The flaky test `ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit` suffers from the problem described here from time to time leading to higher rate of connection creation than expected. ## The solution The solution is to remove assumption 1 stated earlier. Instead replace assumption 1 with: Start time of a new sample window is the nearest time at which the window should have started assuming no gaps. The nearest time is calculated as ``` currentWindowStartTimeMs = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs()) where recordingTimeMs is time of first record in a window previousWindowEndtime is end time for previous window calculated as previousWindowStartTime + quota.window.size.seconds config.timeWindowMs is quota.window.size.seconds ``` With the solution, T5 moves to 3rd window (window rollover occurs at T1 + 2000ms) and the rate at T5 becomes: Rate at T5 = 2/ (now - start time of oldest window) = 2 / 1.010 = 1.98 events per second This scenario has also been added as a unit test in `MetricsTest.java` ## Code changes 1. Changes in SampleState to make the change in assumption 1 as described above. The change is made when rollover into a window occurs. 2. Add new tests in `MetricsTest.java` 3. Add lifecycles state in a Sample and use that to add cautious guards. 4. Cosmetic syntax changes across files. ## Testing - [x] New test added to validate the change in assumption. - [x] `./gradlew unitTest` passes. - [x] `./gradlew integrationTest` known flaky tests mentioned in https://issues.apache.org/jira/browse/KAFKA-8115 failing. ## Longer term solutions 1. Longer term I think we should move to a sliding window based approach to calculate rate instead of the fixed window approach applied today. 2. The current rate limiting approach also allows short burst of traffic. There should be a configurable option for the users to choose between the approach which allows short bursts vs. an approach where system tried to maintain a smooth rate over time such that at no time does it go beyond the allocated threshold. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org