nandini12396 opened a new pull request, #21108:
URL: https://github.com/apache/kafka/pull/21108
###Problem:
1. Race condition and quota leaks: Multiple threads could check quota before
any recorded usage, allowing all to bypass limits simultaneously. Additionally,
in multi-partition fetches, quota was reserved per-partition but could leak if
some partitions failed or were throttled, leading to quota exhaustion over time.
2. Startup race condition: RemoteLogManager initialized with default quotas
(Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply
correct values, creating a window (100ms-5s) where operations could exceed
configured quotas.
###Solution:
1. Atomic quota reservation
- Added `RLMQuotaManager.recordAndGetThrottleTimeMs()` to atomically
record usage and check quota in a single synchronized operation
- Added quotaReservedBytes field to RemoteStorageFetchInfo to track
per-partition reservations
- Modified ReplicaManager to call `recordAndCheckFetchQuota()` BEFORE
dispatching remote fetch, ensuring quota is reserved atomically based on
adjustedMaxBytes
- If throttled, immediately release the reservation since fetch won't
execute
- RemoteLogReader adjusts quota using delta (actual - reserved) after
fetch completes
- On error, releases the full reservation to prevent leaks
2. Eager startup quota initialization
- Ensures quotas are correct before broker starts serving requests
- Added `BrokerServer.applyRemoteLogQuotas()` to eagerly apply quota
configs immediately after RemoteLogManager creation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]