This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 3170e1130c4 KAFKA-18345; Prevent livelocked elections (#19658)
3170e1130c4 is described below
commit 3170e1130c4464705f4d636c5bb5a0d313c5ba96
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon May 12 13:23:18 2025 -0700
KAFKA-18345; Prevent livelocked elections (#19658)
At the retry limit binaryExponentialElectionBackoffMs it becomes
statistically likely that the exponential backoff returned
electionBackoffMaxMs. This is an issue as multiple replicas can get
stuck starting elections at the same cadence.
This change fixes that by added a random jitter to the max election
backoff.
Reviewers: José Armando García Sancio <[email protected]>, TaiJuWu
<[email protected]>, Yung <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 21 +++-----
.../main/java/org/apache/kafka/raft/RaftUtil.java | 15 ++++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 4 +-
.../java/org/apache/kafka/raft/MockableRandom.java | 5 ++
.../java/org/apache/kafka/raft/RaftUtilTest.java | 62 ++++++++++++++++++++++
5 files changed, 92 insertions(+), 15 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index d1b1d3477cc..2bf20369090 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -166,7 +166,8 @@ import static
org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
*/
@SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity",
"ParameterNumber", "NPathComplexity" })
public final class KafkaRaftClient<T> implements RaftClient<T> {
- private static final int RETRY_BACKOFF_BASE_MS = 100;
+ // visible for testing
+ static final int RETRY_BACKOFF_BASE_MS = 50;
private static final int MAX_NUMBER_OF_BATCHES = 10;
public static final int MAX_FETCH_WAIT_MS = 500;
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@@ -1027,7 +1028,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// replica has failed multiple elections in succession.
candidate.startBackingOff(
currentTimeMs,
- binaryExponentialElectionBackoffMs(candidate.retries())
+ RaftUtil.binaryExponentialElectionBackoffMs(
+ quorumConfig.electionBackoffMaxMs(),
+ RETRY_BACKOFF_BASE_MS,
+ candidate.retries(),
+ random
+ )
);
}
} else if (state instanceof ProspectiveState prospective) {
@@ -1045,17 +1051,6 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
}
- private int binaryExponentialElectionBackoffMs(int retries) {
- if (retries <= 0) {
- throw new IllegalArgumentException("Retries " + retries + " should
be larger than zero");
- }
- // upper limit exponential co-efficients at 20 to avoid overflow
- return Math.min(
- RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries -
1)),
- quorumConfig.electionBackoffMaxMs()
- );
- }
-
private int strictExponentialElectionBackoffMs(int positionInSuccessors,
int totalNumSuccessors) {
if (positionInSuccessors == 0) {
return 0;
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 12c48955b39..fea9846aa13 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -765,4 +766,18 @@ public class RaftUtil {
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex()
== topicPartition.partition();
}
+
+ static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int
backoffBaseMs, int retries, Random random) {
+ if (retries <= 0) {
+ throw new IllegalArgumentException("Retries " + retries + " should
be larger than zero");
+ }
+ // Takes minimum of the following:
+ // 1. exponential backoff calculation (maxes out at 102.4 seconds)
+ // 2. configurable electionBackoffMaxMs + jitter
+ // The jitter is added to prevent livelock of elections.
+ return Math.min(
+ backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)),
+ backoffMaxMs + random.nextInt(backoffBaseMs)
+ );
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 79c255efbc8..b1205bb1cdf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1817,7 +1817,7 @@ public class KafkaRaftClientTest {
context.client.poll();
assertTrue(candidate.isBackingOff());
assertEquals(
- context.electionBackoffMaxMs,
+ context.electionBackoffMaxMs + exponentialFactor,
candidate.remainingBackoffMs(context.time.milliseconds())
);
@@ -1826,7 +1826,7 @@ public class KafkaRaftClientTest {
// Even though candidacy was rejected, local replica will backoff for
jitter period
// before transitioning to prospective and starting a new election.
- context.time.sleep(context.electionBackoffMaxMs - 1);
+ context.time.sleep(context.electionBackoffMaxMs + exponentialFactor -
1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
index b487b160678..45cfd568d80 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
@@ -48,4 +48,9 @@ class MockableRandom extends Random {
public int nextInt(int bound) {
return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
}
+
+ @Override
+ public int nextInt(int origin, int bound) {
+ return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 681bcaae8de..81485adca69 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -58,16 +58,24 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
+import java.util.Random;
import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
+import static
org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RaftUtilTest {
@@ -569,6 +577,60 @@ public class RaftUtilTest {
assertEquals(expectedJson, json.toString());
}
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
+ public void testExponentialBoundOfExponentialElectionBackoffMs(int
retries) {
+ Random mockedRandom = Mockito.mock(Random.class);
+ int electionBackoffMaxMs = 1000;
+
+ // test the bound of the method's first call to random.nextInt
+ binaryExponentialElectionBackoffMs(electionBackoffMaxMs,
RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
+ ArgumentCaptor<Integer> nextIntCaptor =
ArgumentCaptor.forClass(Integer.class);
+ Mockito.verify(mockedRandom).nextInt(Mockito.eq(1),
nextIntCaptor.capture());
+ int actualBound = nextIntCaptor.getValue();
+ int expectedBound = (int) (2 * Math.pow(2, retries - 1));
+ // after the 10th retry, the bound of the first call to random.nextInt
will remain capped to
+ // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
+ if (retries > 10) {
+ expectedBound = 2048;
+ }
+ assertEquals(expectedBound, actualBound, "Incorrect bound for
retries=" + retries);
+ }
+
+ // test that the return value of the method is capped to
QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
+ // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will
result in this cap
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 20, 21, 22, 2048})
+ public void testExponentialElectionBackoffMsIsCapped(int exponential) {
+ Random mockedRandom = Mockito.mock(Random.class);
+ int electionBackoffMaxMs = 1000;
+ // this is the max bound of the method's first call to random.nextInt
+ int firstNextIntMaxBound = 2048;
+
+ int jitterMs = 50;
+ Mockito.when(mockedRandom.nextInt(1,
firstNextIntMaxBound)).thenReturn(exponential);
+
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
+
+ int returnedBackoffMs =
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS,
11, mockedRandom);
+
+ // verify nextInt was called on both expected bounds
+ ArgumentCaptor<Integer> nextIntCaptor =
ArgumentCaptor.forClass(Integer.class);
+ Mockito.verify(mockedRandom).nextInt(Mockito.eq(1),
nextIntCaptor.capture());
+ Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture());
+ List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+ assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
+ assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
+
+ // finally verify the backoff returned is capped to
electionBackoffMaxMs + jitterMs
+ int backoffValueCap = electionBackoffMaxMs + jitterMs;
+ if (exponential < 21) {
+ assertEquals(RETRY_BACKOFF_BASE_MS * exponential,
returnedBackoffMs);
+ assertTrue(returnedBackoffMs < backoffValueCap);
+ } else {
+ assertEquals(backoffValueCap, returnedBackoffMs);
+ }
+ }
+
private Records createRecords() {
ByteBuffer allocate = ByteBuffer.allocate(1024);