xiangfu0 commented on code in PR #18531:
URL: https://github.com/apache/pinot/pull/18531#discussion_r3273796293


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -133,31 +173,51 @@ private KinesisMessageBatch 
getKinesisMessageBatch(KinesisPartitionGroupOffset s
     return new KinesisMessageBatch(messages, offsetOfNextBatch, 
_nextShardIterator == null, batchSizeInBytes);
   }
 
-  /**
-   * Kinesis enforces a limit of 5 getRecords request per second on each shard 
from AWS end, beyond which we start
-   * getting {@link ProvisionedThroughputExceededException}. Rate limit the 
requests to avoid this.
-   */
-  private void rateLimitRequests() {
-    long currentTimeMs = System.currentTimeMillis();
-    int currentTimeSeconds = (int) 
TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
-    if (currentTimeSeconds == _currentSecond) {
-      if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
-        try {
-          Thread.sleep(1000 - (currentTimeMs % 1000));
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-        _currentSecond = (int) 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
-        _numRequestsInCurrentSecond = 1;
-      } else {
-        _numRequestsInCurrentSecond++;
-      }
-    } else {
-      _currentSecond = currentTimeSeconds;
-      _numRequestsInCurrentSecond = 1;
+  private <T> T executeKinesisRequest(String shardId, RequestType requestType, 
Supplier<T> requestSupplier) {
+    _requestRateLimiter.acquire(_config.getStreamTopicName(), shardId, 
requestType, _config.getRpsLimitPerSecond());

Review Comment:
   `PartitionGroupConsumer.fetchMessages(..., timeoutMs)` is required to return 
within `timeoutMs`, but this limiter wait now happens before any of the 
deadline checks. With fractional limits like `0.25`, `RateLimiter.acquire()` 
can spend about 4s before `GetShardIterator` and another 4s before 
`GetRecords`, so callers using the standard 5s fetch timeout can block well 
past their budget even when AWS never throttles. That stalls realtime 
ingestion/catch-up instead of returning the empty batch the interface contract 
expects; the limiter path needs to consume the remaining budget and stop 
waiting once `timeoutMs` is exhausted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to