This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d646a09dd0b KAFKA-16531: calculate check-quorum when leader is not in voter set (#16211) d646a09dd0b is described below commit d646a09dd0b03e6823f6d8b65f8b5c8619826a31 Author: Luke Chen <show...@gmail.com> AuthorDate: Fri Jun 21 12:22:24 2024 +0900 KAFKA-16531: calculate check-quorum when leader is not in voter set (#16211) In the check-quorum calculation, the leader should not assume that it is part of the voter set. This may happen when the leader is removing itself from the voter set. This PR improves it by checking if leader is in the voter set or not, and decide how many follower fetches required. Also add tests. Co-authored-by: Colin P. McCabe <cmcc...@apache.org> Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, José Armando García Sancio <jsan...@apache.org> --- .../java/org/apache/kafka/raft/LeaderState.java | 17 +++++++-- .../org/apache/kafka/raft/LeaderStateTest.java | 44 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index a5f7c0b8f4b..7f0f6777453 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -118,9 +118,10 @@ public class LeaderState<T> implements EpochState { long remainingMs = checkQuorumTimer.remainingMs(); if (remainingMs == 0) { log.info( - "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, - fetchedVoters); + fetchedVoters, + voterStates.keySet()); } return remainingMs; } @@ -133,8 +134,16 @@ public class LeaderState<T> implements EpochState { */ public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); - // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 - int majority = voterStates.size() / 2; + // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc. + int majority = (voterStates.size() / 2) + 1; + // If the leader is in the voter set, it should be implicitly counted as part of the + // majority, but the leader will never be a member of the fetchedVoters. + // If the leader is not in the voter set, it is not in the majority. Then, the + // majority can only be composed of fetched voters. + if (voterStates.containsKey(localId)) { + majority = majority - 1; + } + if (fetchedVoters.size() >= majority) { fetchedVoters.clear(); checkQuorumTimer.update(currentTimeMs); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index dadddc522c4..1779a231464 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -762,6 +762,50 @@ public class LeaderStateTest { assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); } + @Test + public void testCheckQuorumAfterVoterSetChanges() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + Set<Integer> originalVoterSet = mkSet(localId, node1, node2); + LeaderState<?> state = newLeaderState(originalVoterSet, 0L); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // checkQuorum timeout not exceeded, should not expire the timer + time.sleep(checkQuorumTimeoutMs / 2); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch request from 1 voter node, the timer should be reset + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Adding 1 new voter to the voter set + Set<Integer> voterSetWithNode3 = mkSet(localId, node1, node2, node3); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithNode3)); + + time.sleep(checkQuorumTimeoutMs / 2); + // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Timer should be reset after receiving another voter's fetch request + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // removing leader from the voter set + Set<Integer> voterSetWithoutLeader = mkSet(node1, node2, node3); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithoutLeader)); + + time.sleep(checkQuorumTimeoutMs / 2); + // received fetch request from 1 voter, the timer should not be reset. + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch request from another voter, the timer should be reset since the current quorum majority is 2. + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + } + @Test public void testCheckQuorumWithOneVoter() { int observer = 1;