xiangfu0 commented on code in PR #18531:
URL: https://github.com/apache/pinot/pull/18531#discussion_r3273796293
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -133,31 +173,51 @@ private KinesisMessageBatch
getKinesisMessageBatch(KinesisPartitionGroupOffset s
return new KinesisMessageBatch(messages, offsetOfNextBatch,
_nextShardIterator == null, batchSizeInBytes);
}
- /**
- * Kinesis enforces a limit of 5 getRecords request per second on each shard
from AWS end, beyond which we start
- * getting {@link ProvisionedThroughputExceededException}. Rate limit the
requests to avoid this.
- */
- private void rateLimitRequests() {
- long currentTimeMs = System.currentTimeMillis();
- int currentTimeSeconds = (int)
TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
- if (currentTimeSeconds == _currentSecond) {
- if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
- try {
- Thread.sleep(1000 - (currentTimeMs % 1000));
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- _currentSecond = (int)
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
- _numRequestsInCurrentSecond = 1;
- } else {
- _numRequestsInCurrentSecond++;
- }
- } else {
- _currentSecond = currentTimeSeconds;
- _numRequestsInCurrentSecond = 1;
+ private <T> T executeKinesisRequest(String shardId, RequestType requestType,
Supplier<T> requestSupplier) {
+ _requestRateLimiter.acquire(_config.getStreamTopicName(), shardId,
requestType, _config.getRpsLimitPerSecond());
Review Comment:
`PartitionGroupConsumer.fetchMessages(..., timeoutMs)` is required to return
within `timeoutMs`, but this limiter wait now happens before any of the
deadline checks. With fractional limits like `0.25`, `RateLimiter.acquire()`
can spend about 4s before `GetShardIterator` and another 4s before
`GetRecords`, so callers using the standard 5s fetch timeout can block well
past their budget even when AWS never throttles. That stalls realtime
ingestion/catch-up instead of returning the empty batch the interface contract
expects; the limiter path needs to consume the remaining budget and stop
waiting once `timeoutMs` is exhausted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]