This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 257f8d4093b KAFKA-18345; Prevent livelocked elections (#19658)
257f8d4093b is described below
commit 257f8d4093b9dbcf8f1032f9b58672f57cb530cb
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon May 12 13:23:18 2025 -0700
KAFKA-18345; Prevent livelocked elections (#19658)
At the retry limit binaryExponentialElectionBackoffMs it becomes
statistically likely that the exponential backoff returned
electionBackoffMaxMs. This is an issue as multiple replicas can get
stuck starting elections at the same cadence.
This change fixes that by added a random jitter to the max election
backoff.
Reviewers: José Armando García Sancio <[email protected]>, TaiJuWu
<[email protected]>, Yung <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 26 ++++---
.../main/java/org/apache/kafka/raft/RaftUtil.java | 15 ++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 10 ++-
.../java/org/apache/kafka/raft/RaftUtilTest.java | 86 ++++++++++++++++++++++
4 files changed, 123 insertions(+), 14 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 624c16b008b..35c91975cc0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -164,7 +164,8 @@ import static
org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
* are not necessarily offset-aligned.
*/
public final class KafkaRaftClient<T> implements RaftClient<T> {
- private static final int RETRY_BACKOFF_BASE_MS = 100;
+ // visible for testing
+ static final int RETRY_BACKOFF_BASE_MS = 50;
private static final int MAX_NUMBER_OF_BATCHES = 10;
public static final int MAX_FETCH_WAIT_MS = 500;
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@@ -921,7 +922,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
state.startBackingOff(
currentTimeMs,
- binaryExponentialElectionBackoffMs(state.retries())
+ RaftUtil.binaryExponentialElectionBackoffMs(
+ quorumConfig.electionBackoffMaxMs(),
+ RETRY_BACKOFF_BASE_MS,
+ state.retries(),
+ random
+ )
);
}
}
@@ -935,15 +941,6 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
}
- private int binaryExponentialElectionBackoffMs(int retries) {
- if (retries <= 0) {
- throw new IllegalArgumentException("Retries " + retries + " should
be larger than zero");
- }
- // upper limit exponential co-efficients at 20 to avoid overflow
- return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 <<
Math.min(20, retries - 1)),
- quorumConfig.electionBackoffMaxMs());
- }
-
private int strictExponentialElectionBackoffMs(int positionInSuccessors,
int totalNumSuccessors) {
if (positionInSuccessors == 0) {
return 0;
@@ -3029,7 +3026,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
- long backoffDurationMs =
binaryExponentialElectionBackoffMs(state.retries());
+ long backoffDurationMs =
RaftUtil.binaryExponentialElectionBackoffMs(
+ quorumConfig.electionBackoffMaxMs(),
+ RETRY_BACKOFF_BASE_MS,
+ state.retries(),
+ random
+ );
logger.info("Election has timed out, backing off for {}ms before
becoming a candidate again",
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 018bec0d632..9d9796fb027 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -762,4 +763,18 @@ public class RaftUtil {
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex()
== topicPartition.partition();
}
+
+ static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int
backoffBaseMs, int retries, Random random) {
+ if (retries <= 0) {
+ throw new IllegalArgumentException("Retries " + retries + " should
be larger than zero");
+ }
+ // Takes minimum of the following:
+ // 1. exponential backoff calculation (maxes out at 102.5 seconds with
backoffBaseMs of 50ms)
+ // 2. configurable electionBackoffMaxMs + jitter
+ // The jitter is added to prevent livelock of elections.
+ return Math.min(
+ backoffBaseMs * (1 + random.nextInt(2 << Math.min(10, retries -
1))),
+ backoffMaxMs + random.nextInt(backoffBaseMs)
+ );
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 248fc8cf564..23043a82b2b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1083,7 +1083,7 @@ public class KafkaRaftClientTest {
context.assertVotedCandidate(epoch, localId);
// After backoff, we will become a candidate again
- context.time.sleep(context.electionBackoffMaxMs);
+ context.time.sleep(context.electionBackoffMaxMs + jitterMs);
context.client.poll();
context.assertVotedCandidate(epoch + 1, localId);
}
@@ -1681,6 +1681,7 @@ public class KafkaRaftClientTest {
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);
+ CandidateState candidate =
context.client.quorum().candidateStateOrThrow();
// Quorum size is two. If the other member rejects, then we need to
schedule a revote.
RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0,
0L, 1);
@@ -1691,13 +1692,18 @@ public class KafkaRaftClientTest {
);
context.client.poll();
+ assertTrue(candidate.isBackingOff());
+ assertEquals(
+ context.electionBackoffMaxMs + exponentialFactor,
+ candidate.remainingBackoffMs(context.time.milliseconds())
+ );
// All nodes have rejected our candidacy, but we should still remember
that we had voted
context.assertVotedCandidate(epoch, localId);
// Even though our candidacy was rejected, we will backoff for jitter
period
// before we bump the epoch and start a new election.
- context.time.sleep(context.electionBackoffMaxMs - 1);
+ context.time.sleep(context.electionBackoffMaxMs + exponentialFactor -
1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
new file mode 100644
index 00000000000..9d79cc53bc6
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
+import static
org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RaftUtilTest {
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
+ public void testExponentialBoundOfExponentialElectionBackoffMs(int
retries) {
+ Random mockedRandom = Mockito.mock(Random.class);
+ int electionBackoffMaxMs = 1000;
+
+ // test the bound of the method's first call to random.nextInt
+ binaryExponentialElectionBackoffMs(electionBackoffMaxMs,
RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
+ ArgumentCaptor<Integer> nextIntCaptor =
ArgumentCaptor.forClass(Integer.class);
+ Mockito.verify(mockedRandom,
Mockito.times(2)).nextInt(nextIntCaptor.capture());
+ List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+ int actualBound = allCapturedBounds.get(0);
+ int expectedBound = (int) (2 * Math.pow(2, retries - 1));
+ // after the 10th retry, the bound of the first call to random.nextInt
will remain capped to
+ // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
+ if (retries > 10) {
+ expectedBound = 2048;
+ }
+ assertEquals(expectedBound, actualBound, "Incorrect bound for
retries=" + retries);
+ }
+
+ // test that the return value of the method is capped to
QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
+ // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS) - 1 = 20
will result in this cap
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 19, 20, 21, 2048})
+ public void testExponentialElectionBackoffMsIsCapped(int exponential) {
+ Random mockedRandom = Mockito.mock(Random.class);
+ int electionBackoffMaxMs = 1000;
+ // this is the max bound of the method's first call to random.nextInt
+ int firstNextIntMaxBound = 2048;
+
+ int jitterMs = 50;
+
Mockito.when(mockedRandom.nextInt(firstNextIntMaxBound)).thenReturn(exponential);
+
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
+
+ int returnedBackoffMs =
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS,
11, mockedRandom);
+
+ // verify nextInt was called on both expected bounds
+ ArgumentCaptor<Integer> nextIntCaptor =
ArgumentCaptor.forClass(Integer.class);
+ Mockito.verify(mockedRandom,
Mockito.times(2)).nextInt(nextIntCaptor.capture());
+ List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+ assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
+ assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
+
+ // finally verify the backoff returned is capped to
electionBackoffMaxMs + jitterMs
+ int backoffValueCap = electionBackoffMaxMs + jitterMs;
+ if (exponential < 20) {
+ assertEquals(RETRY_BACKOFF_BASE_MS * (exponential + 1),
returnedBackoffMs);
+ assertTrue(returnedBackoffMs < backoffValueCap);
+ } else {
+ assertEquals(backoffValueCap, returnedBackoffMs);
+ }
+ }
+}