This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 833e25f0159 KAFKA-19605; Fix the busy loop occurring in kraft client
observers (#20354)
833e25f0159 is described below
commit 833e25f0159c57ad1322ed6bb7670a9c7927f8fb
Author: Kevin Wu <[email protected]>
AuthorDate: Fri Aug 15 09:43:46 2025 -0500
KAFKA-19605; Fix the busy loop occurring in kraft client observers (#20354)
The broker observer should not read update voter set timer value when
polling to determine its backoff, since brokers cannot auto-join the
KRaft voter set. If auto-join or kraft.version=1 is not supported,
controller observers should not read this timer either when polling.
The updateVoterSetPeriodMs timer is not something that should be
considered when calculating the backoff returned by polling, since this
timer does not represent the same thing as the fetchTimeMs timer.
Reviewers: Chia-Ping Tsai <[email protected]>, José Armando García
Sancio <[email protected]>, Alyssa Huang <[email protected]>,
Kuan-Po Tseng <[email protected]>
---
.../main/java/org/apache/kafka/raft/FollowerState.java | 5 -----
.../main/java/org/apache/kafka/raft/KafkaRaftClient.java | 16 ++++------------
2 files changed, 4 insertions(+), 17 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 4cbc8778149..2564a683ed9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -159,11 +159,6 @@ public class FollowerState implements EpochState {
return updateVoterSetPeriodTimer.isExpired();
}
- public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) {
- updateVoterSetPeriodTimer.update(currentTimeMs);
- return updateVoterSetPeriodTimer.remainingMs();
- }
-
public void resetUpdateVoterSetPeriod(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
updateVoterSetPeriodTimer.reset(updateVoterPeriodMs());
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index bb70c5a7df4..fc4eea109fc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -3315,10 +3315,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return Math.min(
backoffMs,
- Math.min(
- state.remainingFetchTimeMs(currentTimeMs),
- state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
- )
+ state.remainingFetchTimeMs(currentTimeMs)
);
}
@@ -3333,11 +3330,10 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
- final long backoffMs;
if (shutdown != null) {
// If we are an observer, then we can shutdown immediately. We
want to
// skip potentially sending any add or remove voter RPCs.
- backoffMs = 0;
+ return 0;
} else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
final var localReplicaKey = quorum.localReplicaKeyOrThrow();
final var voters = partitionState.lastVoterSet();
@@ -3356,17 +3352,13 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
} else {
sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
}
- backoffMs = sendResult.timeToWaitMs();
if (sendResult.requestSent()) {
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
+ return sendResult.timeToWaitMs();
} else {
- backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
+ return maybeSendFetchToBestNode(state, currentTimeMs);
}
- return Math.min(
- backoffMs,
- state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
- );
}
private long maybeSendFetchToBestNode(FollowerState state, long
currentTimeMs) {