This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 3170e1130c4 KAFKA-18345; Prevent livelocked elections (#19658)
3170e1130c4 is described below

commit 3170e1130c4464705f4d636c5bb5a0d313c5ba96
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon May 12 13:23:18 2025 -0700

    KAFKA-18345; Prevent livelocked elections (#19658)
    
    At the retry limit binaryExponentialElectionBackoffMs it becomes
    statistically likely that the exponential backoff returned
    electionBackoffMaxMs. This is an issue as multiple replicas can get
    stuck starting elections at the same cadence.
    
    This change fixes that by added a random jitter to the max election
    backoff.
    
    Reviewers: José Armando García Sancio <[email protected]>, TaiJuWu
     <[email protected]>, Yung <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 21 +++-----
 .../main/java/org/apache/kafka/raft/RaftUtil.java  | 15 ++++++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  4 +-
 .../java/org/apache/kafka/raft/MockableRandom.java |  5 ++
 .../java/org/apache/kafka/raft/RaftUtilTest.java   | 62 ++++++++++++++++++++++
 5 files changed, 92 insertions(+), 15 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index d1b1d3477cc..2bf20369090 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -166,7 +166,8 @@ import static 
org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
  */
 @SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity", 
"ParameterNumber", "NPathComplexity" })
 public final class KafkaRaftClient<T> implements RaftClient<T> {
-    private static final int RETRY_BACKOFF_BASE_MS = 100;
+    // visible for testing
+    static final int RETRY_BACKOFF_BASE_MS = 50;
     private static final int MAX_NUMBER_OF_BATCHES = 10;
     public static final int MAX_FETCH_WAIT_MS = 500;
     public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@@ -1027,7 +1028,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 // replica has failed multiple elections in succession.
                 candidate.startBackingOff(
                     currentTimeMs,
-                    binaryExponentialElectionBackoffMs(candidate.retries())
+                    RaftUtil.binaryExponentialElectionBackoffMs(
+                        quorumConfig.electionBackoffMaxMs(),
+                        RETRY_BACKOFF_BASE_MS,
+                        candidate.retries(),
+                        random
+                    )
                 );
             }
         } else if (state instanceof ProspectiveState prospective) {
@@ -1045,17 +1051,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
     }
 
-    private int binaryExponentialElectionBackoffMs(int retries) {
-        if (retries <= 0) {
-            throw new IllegalArgumentException("Retries " + retries + " should 
be larger than zero");
-        }
-        // upper limit exponential co-efficients at 20 to avoid overflow
-        return Math.min(
-            RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 
1)),
-            quorumConfig.electionBackoffMaxMs()
-        );
-    }
-
     private int strictExponentialElectionBackoffMs(int positionInSuccessors, 
int totalNumSuccessors) {
         if (positionInSuccessors == 0) {
             return 0;
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 12c48955b39..fea9846aa13 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -765,4 +766,18 @@ public class RaftUtil {
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() 
== topicPartition.partition();
     }
+
+    static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int 
backoffBaseMs, int retries, Random random) {
+        if (retries <= 0) {
+            throw new IllegalArgumentException("Retries " + retries + " should 
be larger than zero");
+        }
+        // Takes minimum of the following:
+        // 1. exponential backoff calculation (maxes out at 102.4 seconds)
+        // 2. configurable electionBackoffMaxMs + jitter
+        // The jitter is added to prevent livelock of elections.
+        return Math.min(
+            backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)),
+            backoffMaxMs + random.nextInt(backoffBaseMs)
+        );
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 79c255efbc8..b1205bb1cdf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1817,7 +1817,7 @@ public class KafkaRaftClientTest {
         context.client.poll();
         assertTrue(candidate.isBackingOff());
         assertEquals(
-            context.electionBackoffMaxMs,
+            context.electionBackoffMaxMs + exponentialFactor,
             candidate.remainingBackoffMs(context.time.milliseconds())
         );
 
@@ -1826,7 +1826,7 @@ public class KafkaRaftClientTest {
 
         // Even though candidacy was rejected, local replica will backoff for 
jitter period
         // before transitioning to prospective and starting a new election.
-        context.time.sleep(context.electionBackoffMaxMs - 1);
+        context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 
1);
         context.client.poll();
         context.assertVotedCandidate(epoch, localId);
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java 
b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
index b487b160678..45cfd568d80 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java
@@ -48,4 +48,9 @@ class MockableRandom extends Random {
     public int nextInt(int bound) {
         return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
     }
+
+    @Override
+    public int nextInt(int origin, int bound) {
+        return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 681bcaae8de..81485adca69 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -58,16 +58,24 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
+import java.util.Random;
 import java.util.stream.Stream;
 
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
+import static 
org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RaftUtilTest {
 
@@ -569,6 +577,60 @@ public class RaftUtilTest {
         assertEquals(expectedJson, json.toString());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
+    public void testExponentialBoundOfExponentialElectionBackoffMs(int 
retries) {
+        Random mockedRandom = Mockito.mock(Random.class);
+        int electionBackoffMaxMs = 1000;
+
+        // test the bound of the method's first call to random.nextInt
+        binaryExponentialElectionBackoffMs(electionBackoffMaxMs, 
RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
+        ArgumentCaptor<Integer> nextIntCaptor = 
ArgumentCaptor.forClass(Integer.class);
+        Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), 
nextIntCaptor.capture());
+        int actualBound = nextIntCaptor.getValue();
+        int expectedBound = (int) (2 * Math.pow(2, retries - 1));
+        // after the 10th retry, the bound of the first call to random.nextInt 
will remain capped to
+        // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
+        if (retries > 10) {
+            expectedBound = 2048;
+        }
+        assertEquals(expectedBound, actualBound, "Incorrect bound for 
retries=" + retries);
+    }
+
+    // test that the return value of the method is capped to 
QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
+    // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will 
result in this cap
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 20, 21, 22, 2048})
+    public void testExponentialElectionBackoffMsIsCapped(int exponential) {
+        Random mockedRandom = Mockito.mock(Random.class);
+        int electionBackoffMaxMs = 1000;
+        // this is the max bound of the method's first call to random.nextInt
+        int firstNextIntMaxBound = 2048;
+
+        int jitterMs = 50;
+        Mockito.when(mockedRandom.nextInt(1, 
firstNextIntMaxBound)).thenReturn(exponential);
+        
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
+
+        int returnedBackoffMs = 
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 
11, mockedRandom);
+
+        // verify nextInt was called on both expected bounds
+        ArgumentCaptor<Integer> nextIntCaptor = 
ArgumentCaptor.forClass(Integer.class);
+        Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), 
nextIntCaptor.capture());
+        Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture());
+        List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+        assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
+        assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
+
+        // finally verify the backoff returned is capped to 
electionBackoffMaxMs + jitterMs
+        int backoffValueCap = electionBackoffMaxMs + jitterMs;
+        if (exponential < 21) {
+            assertEquals(RETRY_BACKOFF_BASE_MS * exponential, 
returnedBackoffMs);
+            assertTrue(returnedBackoffMs < backoffValueCap);
+        } else {
+            assertEquals(backoffValueCap, returnedBackoffMs);
+        }
+    }
+
     private Records createRecords() {
         ByteBuffer allocate = ByteBuffer.allocate(1024);
 

Reply via email to