This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 257f8d4093b KAFKA-18345; Prevent livelocked elections (#19658)
257f8d4093b is described below

commit 257f8d4093b9dbcf8f1032f9b58672f57cb530cb
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon May 12 13:23:18 2025 -0700

    KAFKA-18345; Prevent livelocked elections (#19658)
    
    At the retry limit binaryExponentialElectionBackoffMs it becomes
    statistically likely that the exponential backoff returned
    electionBackoffMaxMs. This is an issue as multiple replicas can get
    stuck starting elections at the same cadence.
    
    This change fixes that by added a random jitter to the max election
    backoff.
    
    Reviewers: José Armando García Sancio <[email protected]>, TaiJuWu
     <[email protected]>, Yung <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 26 ++++---
 .../main/java/org/apache/kafka/raft/RaftUtil.java  | 15 ++++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 10 ++-
 .../java/org/apache/kafka/raft/RaftUtilTest.java   | 86 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 14 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 624c16b008b..35c91975cc0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -164,7 +164,8 @@ import static 
org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
  *    are not necessarily offset-aligned.
  */
 public final class KafkaRaftClient<T> implements RaftClient<T> {
-    private static final int RETRY_BACKOFF_BASE_MS = 100;
+    // visible for testing
+    static final int RETRY_BACKOFF_BASE_MS = 50;
     private static final int MAX_NUMBER_OF_BATCHES = 10;
     public static final int MAX_FETCH_WAIT_MS = 500;
     public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@@ -921,7 +922,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
                         state.startBackingOff(
                             currentTimeMs,
-                            binaryExponentialElectionBackoffMs(state.retries())
+                            RaftUtil.binaryExponentialElectionBackoffMs(
+                                quorumConfig.electionBackoffMaxMs(),
+                                RETRY_BACKOFF_BASE_MS,
+                                state.retries(),
+                                random
+                            )
                         );
                     }
                 }
@@ -935,15 +941,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
     }
 
-    private int binaryExponentialElectionBackoffMs(int retries) {
-        if (retries <= 0) {
-            throw new IllegalArgumentException("Retries " + retries + " should 
be larger than zero");
-        }
-        // upper limit exponential co-efficients at 20 to avoid overflow
-        return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << 
Math.min(20, retries - 1)),
-                quorumConfig.electionBackoffMaxMs());
-    }
-
     private int strictExponentialElectionBackoffMs(int positionInSuccessors, 
int totalNumSuccessors) {
         if (positionInSuccessors == 0) {
             return 0;
@@ -3029,7 +3026,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             }
             return state.remainingBackoffMs(currentTimeMs);
         } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
-            long backoffDurationMs = 
binaryExponentialElectionBackoffMs(state.retries());
+            long backoffDurationMs = 
RaftUtil.binaryExponentialElectionBackoffMs(
+                quorumConfig.electionBackoffMaxMs(),
+                RETRY_BACKOFF_BASE_MS,
+                state.retries(),
+                random
+            );
             logger.info("Election has timed out, backing off for {}ms before 
becoming a candidate again",
                 backoffDurationMs);
             state.startBackingOff(currentTimeMs, backoffDurationMs);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 018bec0d632..9d9796fb027 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -762,4 +763,18 @@ public class RaftUtil {
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() 
== topicPartition.partition();
     }
+
+    static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int 
backoffBaseMs, int retries, Random random) {
+        if (retries <= 0) {
+            throw new IllegalArgumentException("Retries " + retries + " should 
be larger than zero");
+        }
+        // Takes minimum of the following:
+        // 1. exponential backoff calculation (maxes out at 102.5 seconds with 
backoffBaseMs of 50ms)
+        // 2. configurable electionBackoffMaxMs + jitter
+        // The jitter is added to prevent livelock of elections.
+        return Math.min(
+            backoffBaseMs * (1 + random.nextInt(2 << Math.min(10, retries - 
1))),
+            backoffMaxMs + random.nextInt(backoffBaseMs)
+        );
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 248fc8cf564..23043a82b2b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1083,7 +1083,7 @@ public class KafkaRaftClientTest {
         context.assertVotedCandidate(epoch, localId);
 
         // After backoff, we will become a candidate again
-        context.time.sleep(context.electionBackoffMaxMs);
+        context.time.sleep(context.electionBackoffMaxMs + jitterMs);
         context.client.poll();
         context.assertVotedCandidate(epoch + 1, localId);
     }
@@ -1681,6 +1681,7 @@ public class KafkaRaftClientTest {
         context.time.sleep(2L * context.electionTimeoutMs());
         context.pollUntilRequest();
         context.assertVotedCandidate(epoch, localId);
+        CandidateState candidate = 
context.client.quorum().candidateStateOrThrow();
 
         // Quorum size is two. If the other member rejects, then we need to 
schedule a revote.
         RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 
0L, 1);
@@ -1691,13 +1692,18 @@ public class KafkaRaftClientTest {
         );
 
         context.client.poll();
+        assertTrue(candidate.isBackingOff());
+        assertEquals(
+            context.electionBackoffMaxMs + exponentialFactor,
+            candidate.remainingBackoffMs(context.time.milliseconds())
+        );
 
         // All nodes have rejected our candidacy, but we should still remember 
that we had voted
         context.assertVotedCandidate(epoch, localId);
 
         // Even though our candidacy was rejected, we will backoff for jitter 
period
         // before we bump the epoch and start a new election.
-        context.time.sleep(context.electionBackoffMaxMs - 1);
+        context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 
1);
         context.client.poll();
         context.assertVotedCandidate(epoch, localId);
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
new file mode 100644
index 00000000000..9d79cc53bc6
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
+import static 
org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RaftUtilTest {
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
+    public void testExponentialBoundOfExponentialElectionBackoffMs(int 
retries) {
+        Random mockedRandom = Mockito.mock(Random.class);
+        int electionBackoffMaxMs = 1000;
+
+        // test the bound of the method's first call to random.nextInt
+        binaryExponentialElectionBackoffMs(electionBackoffMaxMs, 
RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
+        ArgumentCaptor<Integer> nextIntCaptor = 
ArgumentCaptor.forClass(Integer.class);
+        Mockito.verify(mockedRandom, 
Mockito.times(2)).nextInt(nextIntCaptor.capture());
+        List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+        int actualBound = allCapturedBounds.get(0);
+        int expectedBound = (int) (2 * Math.pow(2, retries - 1));
+        // after the 10th retry, the bound of the first call to random.nextInt 
will remain capped to
+        // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
+        if (retries > 10) {
+            expectedBound = 2048;
+        }
+        assertEquals(expectedBound, actualBound, "Incorrect bound for 
retries=" + retries);
+    }
+
+    // test that the return value of the method is capped to 
QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
+    // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS) - 1 = 20 
will result in this cap
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 19, 20, 21, 2048})
+    public void testExponentialElectionBackoffMsIsCapped(int exponential) {
+        Random mockedRandom = Mockito.mock(Random.class);
+        int electionBackoffMaxMs = 1000;
+        // this is the max bound of the method's first call to random.nextInt
+        int firstNextIntMaxBound = 2048;
+
+        int jitterMs = 50;
+        
Mockito.when(mockedRandom.nextInt(firstNextIntMaxBound)).thenReturn(exponential);
+        
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
+
+        int returnedBackoffMs = 
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 
11, mockedRandom);
+
+        // verify nextInt was called on both expected bounds
+        ArgumentCaptor<Integer> nextIntCaptor = 
ArgumentCaptor.forClass(Integer.class);
+        Mockito.verify(mockedRandom, 
Mockito.times(2)).nextInt(nextIntCaptor.capture());
+        List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
+        assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
+        assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
+
+        // finally verify the backoff returned is capped to 
electionBackoffMaxMs + jitterMs
+        int backoffValueCap = electionBackoffMaxMs + jitterMs;
+        if (exponential < 20) {
+            assertEquals(RETRY_BACKOFF_BASE_MS * (exponential + 1), 
returnedBackoffMs);
+            assertTrue(returnedBackoffMs < backoffValueCap);
+        } else {
+            assertEquals(backoffValueCap, returnedBackoffMs);
+        }
+    }
+}

Reply via email to