This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0facc3ccecb KAFKA-19813 Incorrect jitter value in
StreamsGroupHeartbeatRequestManager and AbstractHeartbeatRequestManager (#20748)
0facc3ccecb is described below
commit 0facc3ccecb1ea34c2639f066944df68b1d5d38a
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Nov 2 02:31:04 2025 +0800
KAFKA-19813 Incorrect jitter value in StreamsGroupHeartbeatRequestManager
and AbstractHeartbeatRequestManager (#20748)
In StreamsGroupHeartbeatRequestManager and
AbstractHeartbeatRequestManager we use max.poll.interval.ms (default
300000) as the jitter value. Although RequestState.remainingBackoffMs
guards with Math.max(0, …), which won’t make the value become negative,
using a jitter that isn’t in (0–1) is unexpected.
In addition, we should validate that ExponentialBackoff receives a
jitter strictly within (0, 1) to prevent this scenario in the future.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/AbstractHeartbeatRequestManager.java | 3 ++-
.../internals/StreamsGroupHeartbeatRequestManager.java | 3 ++-
.../org/apache/kafka/common/utils/ExponentialBackoff.java | 5 +++++
.../org/apache/kafka/common/utils/ExponentialBackoffTest.java | 11 +++++++++++
4 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 3998d672006..eec41c6d3b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import java.util.Collections;
import static
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+import static
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
@@ -113,7 +114,7 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.heartbeatRequestState = new HeartbeatRequestState(logContext,
time, 0, retryBackoffMs,
- retryBackoffMaxMs, maxPollIntervalMs);
+ retryBackoffMaxMs, RETRY_BACKOFF_JITTER);
this.pollTimer = time.timer(maxPollIntervalMs);
this.metricsManager = metricsManager;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index cb9a38d0ddc..0441566462a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+import static
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
/**
* <p>Manages the request creation and response handling for the streams group
heartbeat. The class creates a
@@ -330,7 +331,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
0,
retryBackoffMs,
retryBackoffMaxMs,
- maxPollIntervalMs
+ RETRY_BACKOFF_JITTER
);
this.pollTimer = time.timer(maxPollIntervalMs);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
index 05994480147..73d68b6cf44 100644
---
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
+++
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.utils;
import java.util.concurrent.ThreadLocalRandom;
+import static java.lang.String.format;
+
/**
* A utility class for keeping the parameters and providing the value of
exponential
* retry backoff, exponential reconnect backoff, exponential timeout, etc.
@@ -42,6 +44,9 @@ public class ExponentialBackoff {
this.initialInterval = Math.min(maxInterval, initialInterval);
this.multiplier = multiplier;
this.maxInterval = maxInterval;
+ if (jitter < 0 || jitter > 1) {
+ throw new IllegalArgumentException(format("jitter must be between
0 and 1, but got %s", jitter));
+ }
this.jitter = jitter;
this.expMax = maxInterval > initialInterval ?
Math.log(maxInterval / (double) Math.max(initialInterval, 1))
/ Math.log(multiplier) : 0;
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
index 4e843863ab5..fff921db29b 100644
---
a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.utils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ExponentialBackoffTest {
@@ -54,4 +55,14 @@ public class ExponentialBackoffTest {
assertEquals(400, exponentialBackoff.backoff(2));
assertEquals(400, exponentialBackoff.backoff(3));
}
+
+ @Test
+ public void testExponentialBackoffWithInvalidJitter() {
+ assertEquals("jitter must be between 0 and 1, but got -1.0",
+ assertThrows(IllegalArgumentException.class,
+ () -> new ExponentialBackoff(100, 2, 400,
-1)).getMessage());
+ assertEquals("jitter must be between 0 and 1, but got 3000.0",
+ assertThrows(IllegalArgumentException.class,
+ () -> new ExponentialBackoff(100, 2, 400,
3000)).getMessage());
+ }
}