divijvaidya opened a new pull request, #12045:
URL: https://github.com/apache/kafka/pull/12045

   ## Why does the test fail?
   
`ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()`
 sends 600 connection creation requests at a rate of 40/s with a 
listenerQuotaConnectionRateLimit set to 30/s. The test asserts that even though 
the rate of requests is higher than threshold, due to correct throttling, the 
measured rate at the completion of 600 requests is 30 +- epsilon. The value of 
epsilon is set to 7 which is exceeded from time to time leading to test 
failures.
   
   ## The problem
   Currently, calculate of rate function (used for rate limiting) holds the 
following assumptions:
   1. Start time of a new sample window is time at which first event in that 
window is recorded.
   2. If we don't have `quota.window.num` windows retained, assume that we have 
prior windows with zero records while calculating the rate.
   These assumptions lead to wrong calculation of rate in certain scenario as 
described below. 
   
   Consider a scenario when we have some initial requests, followed by a small 
gap of requests and then another bunch of requests. More specifically:
   
   Configuration = 
[quota.window.size.seconds](https://kafka.apache.org/documentation/#brokerconfigs_quota.window.size.seconds)=
 1s 
[quota.window.num](https://kafka.apache.org/documentation/#brokerconfigs_quota.window.num)
 = 2  
listenerName.[max.connection.creation.rate](https://kafka.apache.org/documentation/#brokerconfigs_max.connection.creation.rate)
 = 30/s`
   
   Record events (E) at timestamps:
   E1 | CurrentTimeStamp (T1) | Window#1 (start time = T1)
   E2 | T2 = T1 + 30ms | Window#1
   E3 | T3 = T1 + 995ms | Window#1
   < No events from T3 to T4 > 
   E4 | T4 = T1 + 1020ms | Window#2 (start time = T1 + 1020ms)
   E5 | T5 = T1 + 2010ms | Window#2
   
   Rate calculated as per current implementation:
   Rate at T1 = 1 / (length of hypothetical prior samples + time elapsed for 
current sample) = 1 / (1 + 0) = 1 events per second
   Rate at T2 = 2 / (1 + 0.030) = 1.94 events per second
   Rate at T3 = 3/ (1 + 0.995) = 1.5 events per second
   Rate at T4 = 4/ (now - start time of oldest window) = 4 / 1.02 = 3.92 events 
per second
   
   When calculating rate for T5, first "obsolete windows" are purged, i.e. any 
window with start time < T5 - (quota.window.size.seconds * quota.window.num), 
thus, Window#1 is purged (because T1 < T5-2s)
   
   Rate at T5 = 2/ (length of hypothetical prior samples + time elapsed for 
current sample) = 2 / 1.99 = 1.005 events per second
   
   **Note how the rate calculation for T5 has fallen back to using the 
assumption that there exists prior windows with zero events (due to purge) 
whereas we do actually have a previous window with > 0 events in it. Hence, 
rate calculated at T5 is incorrect.** In worst case scenarios Window#1 could 
have large number of events in it but calculation of rate towards end of 
Window#2 would ignore all those earlier events leading to an incorrect low 
value of current rate. For throttling use cases, this would lead to allowing 
more events (since current observed rate is low) and thus, violating the 
contract to maintain a sustained `max.connection.creation.rate`
   
   The flaky test 
`ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit` 
suffers from the problem described here from time to time leading to higher 
rate of connection creation than expected.
   
   ## The solution
   The solution is to remove assumption 1 stated earlier. Instead replace 
assumption 1 with:
   
   Start time of a new sample window is the nearest time at which the window 
should have started assuming no gaps. The nearest time is calculated as 
   
   ```
   currentWindowStartTimeMs = recordingTimeMs - ((recordingTimeMs - 
previousWindowEndtime) % config.timeWindowMs())
   
   where 
   recordingTimeMs is time of first record in a window
   previousWindowEndtime is end time for previous window calculated as 
previousWindowStartTime + quota.window.size.seconds
   config.timeWindowMs is quota.window.size.seconds
   ```
   
   With the solution, T5 moves to 3rd window (window rollover occurs at T1 + 
2000ms) and the rate at T5 becomes: 
   Rate at T5 = 2/ (now - start time of oldest window) = 2 / 1.010 = 1.98 
events per second
   
   This scenario has also been added as a unit test in `MetricsTest.java`
   
   ## Code changes
   1. Changes in SampleState to make the change in assumption 1 as described 
above. The change is made when rollover into a window occurs.
   2. Add new tests in `MetricsTest.java` 
   3. Add lifecycles state in a Sample and use that to add cautious guards.
   4. Cosmetic syntax changes across files.
   
   ## Testing
   - [x] New test added to validate the change in assumption.
   - [x] `./gradlew unitTest` passes.
   - [x] `./gradlew integrationTest` known flaky tests mentioned in 
https://issues.apache.org/jira/browse/KAFKA-8115 failing.
   
   ## Longer term solutions
   1. Longer term I think we should move to a sliding window based approach to 
calculate rate instead of the fixed window approach applied today. 
   2. The current rate limiting approach also allows short burst of traffic. 
There should be a configurable option for the users to choose between the 
approach which allows short bursts vs. an approach where system tried to 
maintain a smooth rate over time such that at no time does it go beyond the 
allocated threshold. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to