This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0facc3ccecb KAFKA-19813 Incorrect jitter value in 
StreamsGroupHeartbeatRequestManager and AbstractHeartbeatRequestManager (#20748)
0facc3ccecb is described below

commit 0facc3ccecb1ea34c2639f066944df68b1d5d38a
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Nov 2 02:31:04 2025 +0800

    KAFKA-19813 Incorrect jitter value in StreamsGroupHeartbeatRequestManager 
and AbstractHeartbeatRequestManager (#20748)
    
    In StreamsGroupHeartbeatRequestManager and
    AbstractHeartbeatRequestManager  we use max.poll.interval.ms (default
    300000) as the jitter value. Although RequestState.remainingBackoffMs
    guards with Math.max(0, …), which won’t make the value become negative,
    using a jitter that isn’t in (0–1) is unexpected.
    
    In addition, we should validate that ExponentialBackoff receives a
    jitter strictly within (0, 1) to prevent this scenario in the future.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/AbstractHeartbeatRequestManager.java   |  3 ++-
 .../internals/StreamsGroupHeartbeatRequestManager.java        |  3 ++-
 .../org/apache/kafka/common/utils/ExponentialBackoff.java     |  5 +++++
 .../org/apache/kafka/common/utils/ExponentialBackoffTest.java | 11 +++++++++++
 4 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 3998d672006..eec41c6d3b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import java.util.Collections;
 
 import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+import static 
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
@@ -113,7 +114,7 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
         long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
         this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
-                retryBackoffMaxMs, maxPollIntervalMs);
+                retryBackoffMaxMs, RETRY_BACKOFF_JITTER);
         this.pollTimer = time.timer(maxPollIntervalMs);
         this.metricsManager = metricsManager;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index cb9a38d0ddc..0441566462a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+import static 
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
 
 /**
  * <p>Manages the request creation and response handling for the streams group 
heartbeat. The class creates a
@@ -330,7 +331,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             0,
             retryBackoffMs,
             retryBackoffMaxMs,
-            maxPollIntervalMs
+            RETRY_BACKOFF_JITTER
         );
         this.pollTimer = time.timer(maxPollIntervalMs);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
index 05994480147..73d68b6cf44 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.utils;
 
 import java.util.concurrent.ThreadLocalRandom;
 
+import static java.lang.String.format;
+
 /**
  * A utility class for keeping the parameters and providing the value of 
exponential
  * retry backoff, exponential reconnect backoff, exponential timeout, etc.
@@ -42,6 +44,9 @@ public class ExponentialBackoff {
         this.initialInterval = Math.min(maxInterval, initialInterval);
         this.multiplier = multiplier;
         this.maxInterval = maxInterval;
+        if (jitter < 0 || jitter > 1) {
+            throw new IllegalArgumentException(format("jitter must be between 
0 and 1, but got %s", jitter));
+        }
         this.jitter = jitter;
         this.expMax = maxInterval > initialInterval ?
                 Math.log(maxInterval / (double) Math.max(initialInterval, 1)) 
/ Math.log(multiplier) : 0;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
index 4e843863ab5..fff921db29b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.utils;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ExponentialBackoffTest {
@@ -54,4 +55,14 @@ public class ExponentialBackoffTest {
         assertEquals(400, exponentialBackoff.backoff(2));
         assertEquals(400, exponentialBackoff.backoff(3));
     }
+
+    @Test
+    public void testExponentialBackoffWithInvalidJitter() {
+        assertEquals("jitter must be between 0 and 1, but got -1.0",
+                assertThrows(IllegalArgumentException.class,
+                        () -> new ExponentialBackoff(100, 2, 400, 
-1)).getMessage());
+        assertEquals("jitter must be between 0 and 1, but got 3000.0",
+                assertThrows(IllegalArgumentException.class,
+                        () -> new ExponentialBackoff(100, 2, 400, 
3000)).getMessage());
+    }
 }

Reply via email to