This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d646a09dd0b KAFKA-16531: calculate check-quorum when leader is not in 
voter set (#16211)
d646a09dd0b is described below

commit d646a09dd0b03e6823f6d8b65f8b5c8619826a31
Author: Luke Chen <show...@gmail.com>
AuthorDate: Fri Jun 21 12:22:24 2024 +0900

    KAFKA-16531: calculate check-quorum when leader is not in voter set (#16211)
    
    In the check-quorum calculation, the leader should not assume that it is 
part of the voter set. This may happen when the leader is removing itself from 
the voter set. This PR improves it by checking if leader is in the voter set or 
not, and decide how many follower fetches required. Also add tests.
    
    Co-authored-by: Colin P. McCabe <cmcc...@apache.org>
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, José Armando García Sancio 
<jsan...@apache.org>
---
 .../java/org/apache/kafka/raft/LeaderState.java    | 17 +++++++--
 .../org/apache/kafka/raft/LeaderStateTest.java     | 44 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index a5f7c0b8f4b..7f0f6777453 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -118,9 +118,10 @@ public class LeaderState<T> implements EpochState {
         long remainingMs = checkQuorumTimer.remainingMs();
         if (remainingMs == 0) {
             log.info(
-                "Did not receive fetch request from the majority of the voters 
within {}ms. Current fetched voters are {}.",
+                "Did not receive fetch request from the majority of the voters 
within {}ms. Current fetched voters are {}, and voters are {}",
                 checkQuorumTimeoutMs,
-                fetchedVoters);
+                fetchedVoters,
+                voterStates.keySet());
         }
         return remainingMs;
     }
@@ -133,8 +134,16 @@ public class LeaderState<T> implements EpochState {
      */
     public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) 
{
         updateFetchedVoters(id);
-        // The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
-        int majority = voterStates.size() / 2;
+        // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 
voters... etc.
+        int majority = (voterStates.size() / 2) + 1;
+        // If the leader is in the voter set, it should be implicitly counted 
as part of the
+        // majority, but the leader will never be a member of the 
fetchedVoters.
+        // If the leader is not in the voter set, it is not in the majority. 
Then, the
+        // majority can only be composed of fetched voters.
+        if (voterStates.containsKey(localId)) {
+            majority = majority - 1;
+        }
+
         if (fetchedVoters.size() >= majority) {
             fetchedVoters.clear();
             checkQuorumTimer.update(currentTimeMs);
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index dadddc522c4..1779a231464 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -762,6 +762,50 @@ public class LeaderStateTest {
         assertEquals(0, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
     }
 
+    @Test
+    public void testCheckQuorumAfterVoterSetChanges() {
+        int node1 = 1;
+        int node2 = 2;
+        int node3 = 3;
+        Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
+        LeaderState<?> state = newLeaderState(originalVoterSet, 0L);
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // checkQuorum timeout not exceeded, should not expire the timer
+        time.sleep(checkQuorumTimeoutMs / 2);
+        assertEquals(checkQuorumTimeoutMs / 2, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // received fetch request from 1 voter node, the timer should be reset
+        state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // Adding 1 new voter to the voter set
+        Set<Integer> voterSetWithNode3 = mkSet(localId, node1, node2, node3);
+        state.updateLocalState(new LogOffsetMetadata(1L), 
toMap(voterSetWithNode3));
+
+        time.sleep(checkQuorumTimeoutMs / 2);
+        // received fetch request from 1 voter node, the timer should not be 
reset because the majority should be 3
+        state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs / 2, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // Timer should be reset after receiving another voter's fetch request
+        state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // removing leader from the voter set
+        Set<Integer> voterSetWithoutLeader = mkSet(node1, node2, node3);
+        state.updateLocalState(new LogOffsetMetadata(1L), 
toMap(voterSetWithoutLeader));
+
+        time.sleep(checkQuorumTimeoutMs / 2);
+        // received fetch request from 1 voter, the timer should not be reset.
+        state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs / 2, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // received fetch request from another voter, the timer should be 
reset since the current quorum majority is 2.
+        state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+    }
+
     @Test
     public void testCheckQuorumWithOneVoter() {
         int observer = 1;

Reply via email to