This is an automated email from the ASF dual-hosted git repository.

KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a8ac02c8a0 fix(kinesis): throttle 
ProvisionedThroughputExceededException logs using ThrottledLogger (#18572)
0a8ac02c8a0 is described below

commit 0a8ac02c8a062ced0f8542f5d0351b0ab2e31db4
Author: swaminathanmanish <[email protected]>
AuthorDate: Tue May 26 15:39:47 2026 +0530

    fix(kinesis): throttle ProvisionedThroughputExceededException logs using 
ThrottledLogger (#18572)
    
    * Throttle KinesisConsumer rate-limit error logs using ThrottledLogger
    
    ProvisionedThroughputExceededException is caught per-fetch call, which
    can fire at high frequency across hundreds of shards under sustained AWS
    rate limiting, flooding logs and filling ephemeral storage. Replace the
    unconditional LOGGER.error with ThrottledLogger (5 logs/min) so the
    error remains visible but does not cause disk pressure.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * Throttle KinesisConsumer rate-limit warn logs using ThrottledLogger
    
    Rate-limit exceptions can fire at high frequency across hundreds of
    shards under sustained AWS throttling. Replace bare LOGGER.warn calls
    in fetchMessages retry loop, logRateLimitTimeout, and
    logRequestLimiterTimeout with ThrottledLogger (5 logs/min per exception
    class) to prevent log flooding and disk pressure while preserving
    visibility via dropped-count reporting.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  4 +++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 31 +++++++++++++---------
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 2315fe9f537..61aea5485fd 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -35,6 +35,10 @@
   </properties>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+    </dependency>
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>sdk-core</artifactId>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 5c017acc076..3b8b35fd9be 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.spi.stream.BytesStreamMessage;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
@@ -54,7 +55,9 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Partiti
   private static final int MAX_RATE_LIMIT_BACKOFF_MS = 5000;
   private static final int RATE_LIMIT_BACKOFF_JITTER_BOUND_MS = 250;
   private static final RequestRateLimiter SHARED_REQUEST_RATE_LIMITER = new 
SharedKinesisRequestRateLimiter();
+  private static final double RATE_LIMIT_LOG_RATE_PER_MIN = 5.0;
 
+  private final ThrottledLogger _throttledLogger = new ThrottledLogger(LOGGER, 
RATE_LIMIT_LOG_RATE_PER_MIN);
   private String _nextStartSequenceNumber = null;
   private String _nextShardIterator = null;
   private final RequestRateLimiter _requestRateLimiter;
@@ -109,10 +112,11 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
           return new KinesisMessageBatch(List.of(), startOffset, false, 0);
         }
         long backoffMs = Math.min(computeRateLimitBackoffMs(attempts), 
remainingMs);
-        LOGGER.warn("Rate limit exceeded while fetching messages from Kinesis 
stream: {}, shard: {}, operation: {}, "
-                + "threshold: {}, attempt: {}, backing off for {} ms. Error: 
{}", _config.getStreamTopicName(),
-            startOffset.getShardId(), e.getRequestType(), 
_config.getRpsLimitPerSecond(), attempts, backoffMs,
-            e.getCause().getMessage());
+        _throttledLogger.warn(
+            String.format("Rate limit exceeded while fetching messages from 
Kinesis stream: %s, shard: %s, "
+                    + "operation: %s, threshold: %s, attempt: %d, backing off 
for %d ms",
+                _config.getStreamTopicName(), startOffset.getShardId(), 
e.getRequestType(),
+                _config.getRpsLimitPerSecond(), attempts, backoffMs), 
e.getCause());
         sleep(backoffMs);
       } catch (KinesisRequestTimeoutException e) {
         logRequestLimiterTimeout(startOffset, e);
@@ -220,19 +224,20 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
 
   private void logRateLimitTimeout(KinesisPartitionGroupOffset startOffset, 
int attempts,
       KinesisRateLimitException rateLimitException) {
-    LOGGER.warn("Rate limit exceeded while fetching messages from Kinesis 
stream: {}, shard: {}, operation: {}, "
-            + "threshold: {}, attempts: {}. Fetch timeout exhausted; returning 
empty batch at original offset. "
-            + "Error: {}",
-        _config.getStreamTopicName(), startOffset.getShardId(), 
rateLimitException.getRequestType(),
-        _config.getRpsLimitPerSecond(), attempts, 
rateLimitException.getCause().getMessage());
+    _throttledLogger.warn(
+        String.format("Rate limit exceeded while fetching messages from 
Kinesis stream: %s, shard: %s, "
+                + "operation: %s, threshold: %s, attempts: %d. Fetch timeout 
exhausted; returning empty batch.",
+            _config.getStreamTopicName(), startOffset.getShardId(), 
rateLimitException.getRequestType(),
+            _config.getRpsLimitPerSecond(), attempts), 
rateLimitException.getCause());
   }
 
   private void logRequestLimiterTimeout(KinesisPartitionGroupOffset 
startOffset,
       KinesisRequestTimeoutException timeoutException) {
-    LOGGER.warn("Timed out waiting for Kinesis request limiter while fetching 
messages from stream: {}, shard: {}, "
-            + "operation: {}, threshold: {}. Fetch timeout exhausted; 
returning empty batch at original offset.",
-        _config.getStreamTopicName(), startOffset.getShardId(), 
timeoutException.getRequestType(),
-        _config.getRpsLimitPerSecond());
+    _throttledLogger.warn(
+        String.format("Timed out waiting for Kinesis request limiter while 
fetching messages from stream: %s, "
+                + "shard: %s, operation: %s, threshold: %s. Fetch timeout 
exhausted; returning empty batch.",
+            _config.getStreamTopicName(), startOffset.getShardId(), 
timeoutException.getRequestType(),
+            _config.getRpsLimitPerSecond()), timeoutException);
   }
 
   private BytesStreamMessage extractStreamMessage(Record record, String 
shardId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to