This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37416e1aeba KAFKA-15489: resign leadership when no fetch or fetch 
snapshot from majority voters (#14428)
37416e1aeba is described below

commit 37416e1aebae33d01d5059ba906ec8e0e1107284
Author: Luke Chen <show...@gmail.com>
AuthorDate: Fri Dec 1 03:34:44 2023 +0800

    KAFKA-15489: resign leadership when no fetch or fetch snapshot from 
majority voters (#14428)
    
    In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` 
config, and if the leader did not receive Fetch requests from a majority of the 
quorum for that amount of time, it would begin a new election, to resolve the 
network partition in the quorum. But we missed this implementation in current 
KRaft. Fixed it in this PR.
    
    The commit include:
    1. Added a timer with timeout configuration in `LeaderState`, and check if 
expired each time when leader is polled. If expired, resigning the leadership 
and start a new election.
    
    2. Added `fetchedVoters` in `LeaderState`, and update the value each time 
received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer 
if the majority - 1 of the remote voters sent such requests.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>
---
 .../common/requests/FetchSnapshotRequest.java      |   2 +
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  14 ++-
 .../java/org/apache/kafka/raft/LeaderState.java    |  59 ++++++++++++
 .../java/org/apache/kafka/raft/QuorumState.java    |   2 +
 .../java/org/apache/kafka/raft/RaftConfig.java     |   4 +-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 100 ++++++++++++++++++++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  54 +++++++++++
 .../org/apache/kafka/raft/LeaderStateTest.java     |  44 ++++++++-
 .../apache/kafka/raft/RaftClientTestContext.java   |   2 +
 9 files changed, 273 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
index 1769e94f47f..34db2d1c124 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
@@ -61,6 +61,7 @@ final public class FetchSnapshotRequest extends 
AbstractRequest {
      */
     public static FetchSnapshotRequestData singleton(
         String clusterId,
+        int replicaId,
         TopicPartition topicPartition,
         UnaryOperator<FetchSnapshotRequestData.PartitionSnapshot> operator
     ) {
@@ -70,6 +71,7 @@ final public class FetchSnapshotRequest extends 
AbstractRequest {
 
         return new FetchSnapshotRequestData()
             .setClusterId(clusterId)
+            .setReplicaId(replicaId)
             .setTopics(
                 Collections.singletonList(
                     new FetchSnapshotRequestData.TopicSnapshot()
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index b2e14ee3ec9..a04240b6426 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1257,7 +1257,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
      * - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset 
out of range
      */
     private FetchSnapshotResponseData handleFetchSnapshotRequest(
-        RaftRequest.Inbound requestMetadata
+        RaftRequest.Inbound requestMetadata,
+        long currentTimeMs
     ) {
         FetchSnapshotRequestData data = (FetchSnapshotRequestData) 
requestMetadata.data;
 
@@ -1340,6 +1341,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
         UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
 
+        LeaderState<T> state = quorum.leaderStateOrThrow();
+        state.updateCheckQuorumForFollowingVoter(data.replicaId(), 
currentTimeMs);
+
         return FetchSnapshotResponse.singleton(
             log.topicPartition(),
             responsePartitionSnapshot -> {
@@ -1701,7 +1705,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 break;
 
             case FETCH_SNAPSHOT:
-                responseFuture = 
completedFuture(handleFetchSnapshotRequest(request));
+                responseFuture = 
completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
                 break;
 
             default:
@@ -1875,6 +1879,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
         FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
             clusterId,
+            quorum().localIdOrSentinel(),
             log.topicPartition(),
             snapshotPartition -> {
                 return snapshotPartition
@@ -1990,7 +1995,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        if (shutdown.get() != null || state.isResignRequested()) {
+        long timeUntilCheckQuorumExpires = 
state.timeUntilCheckQuorumExpires(currentTimeMs);
+        if (shutdown.get() != null || state.isResignRequested() || 
timeUntilCheckQuorumExpires == 0) {
             
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
             return 0L;
         }
@@ -2006,7 +2012,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             this::buildBeginQuorumEpochRequest
         );
 
-        return Math.min(timeUntilFlush, timeUntilSend);
+        return Math.min(timeUntilFlush, Math.min(timeUntilSend, 
timeUntilCheckQuorumExpires));
     }
 
     private long maybeSendVoteRequests(
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index d7595393e77..14e327208f7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -22,6 +22,8 @@ import 
org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.ControlRecordUtils;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.slf4j.Logger;
 
@@ -44,6 +46,7 @@ import java.util.stream.Collectors;
  */
 public class LeaderState<T> implements EpochState {
     static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+    static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5;
 
     private final int localId;
     private final int epoch;
@@ -55,17 +58,23 @@ public class LeaderState<T> implements EpochState {
     private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Logger log;
     private final BatchAccumulator<T> accumulator;
+    // The set includes all of the followers voters that FETCH or 
FETCH_SNAPSHOT during the current checkQuorumTimer interval.
+    private final Set<Integer> fetchedVoters = new HashSet<>();
+    private final Timer checkQuorumTimer;
+    private final int checkQuorumTimeoutMs;
 
     // This is volatile because resignation can be requested from an external 
thread.
     private volatile boolean resignRequested = false;
 
     protected LeaderState(
+        Time time,
         int localId,
         int epoch,
         long epochStartOffset,
         Set<Integer> voters,
         Set<Integer> grantingVoters,
         BatchAccumulator<T> accumulator,
+        int fetchTimeoutMs,
         LogContext logContext
     ) {
         this.localId = localId;
@@ -79,6 +88,55 @@ public class LeaderState<T> implements EpochState {
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x of fetch timeout to tolerate some network transition 
time or other IO time.
+        this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * 
CHECK_QUORUM_TIMEOUT_FACTOR);
+        this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);
+    }
+
+    /**
+     * Get the remaining time in milliseconds until the checkQuorumTimer 
expires.
+     * This will happen if we didn't receive a valid fetch/fetchSnapshot 
request from the majority of the voters within checkQuorumTimeoutMs.
+     *
+     * @param currentTimeMs the current timestamp in millisecond
+     * @return the remainingMs before the checkQuorumTimer expired
+     */
+    public long timeUntilCheckQuorumExpires(long currentTimeMs) {
+        checkQuorumTimer.update(currentTimeMs);
+        long remainingMs = checkQuorumTimer.remainingMs();
+        if (remainingMs == 0) {
+            log.info(
+                "Did not receive fetch request from the majority of the voters 
within {}ms. Current fetched voters are {}.",
+                checkQuorumTimeoutMs,
+                fetchedVoters);
+        }
+        return remainingMs;
+    }
+
+    /**
+     * Reset the checkQuorumTimer if we've received fetch/fetchSnapshot 
request from the majority of the voter
+     *
+     * @param id the node id
+     * @param currentTimeMs the current timestamp in millisecond
+     */
+    public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) 
{
+        updateFetchedVoters(id);
+        // The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
+        int majority = voterStates.size() / 2;
+        if (fetchedVoters.size() >= majority) {
+            fetchedVoters.clear();
+            checkQuorumTimer.update(currentTimeMs);
+            checkQuorumTimer.reset(checkQuorumTimeoutMs);
+        }
+    }
+
+    private void updateFetchedVoters(int id) {
+        if (id == localId) {
+            throw new IllegalArgumentException("Received a 
FETCH/FETCH_SNAPSHOT request from the leader itself.");
+        }
+
+        if (isVoter(id)) {
+            fetchedVoters.add(id);
+        }
     }
 
     public BatchAccumulator<T> accumulator() {
@@ -287,6 +345,7 @@ public class LeaderState<T> implements EpochState {
             fetchOffsetMetadata,
             leaderEndOffsetOpt
         );
+        updateCheckQuorumForFollowingVoter(replicaId, currentTimeMs);
 
         return isVoter(state.nodeId) && maybeUpdateHighWatermark();
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 9115358f52d..3daf85275ba 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -453,12 +453,14 @@ public class QuorumState {
         // we typically expect the state machine to be caught up anyway.
 
         LeaderState<T> state = new LeaderState<>(
+            time,
             localIdOrThrow(),
             epoch(),
             epochStartOffset,
             voters,
             candidateState.grantingVoters(),
             accumulator,
+            fetchTimeoutMs,
             logContext
         );
         durableTransitionTo(state);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
index e17de3278d3..5c5865f3581 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
@@ -68,8 +68,8 @@ public class RaftConfig {
 
     public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX 
+ "fetch.timeout.ms";
     public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time 
without a successful fetch from " +
-        "the current leader before becoming a candidate and triggering an 
election for voters; Maximum time without " +
-        "receiving fetch from a majority of the quorum before asking around to 
see if there's a new epoch for leader.";
+        "the current leader before becoming a candidate and triggering an 
election for voters; Maximum time " +
+        "a leader can go without receiving valid fetch or fetchSnapshot 
request from a majority of the quorum before resigning.";
     public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000;
 
     public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG = 
QUORUM_PREFIX + "election.backoff.max.ms";
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index f3d7012db92..44d39880aa0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -701,6 +702,97 @@ final public class KafkaRaftClientSnapshotTest {
         assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) 
response.unalignedRecords()).buffer());
     }
 
+    @Test
+    public void 
testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters()
 throws Exception {
+        int localId = 0;
+        int voter1 = 1;
+        int voter2 = 2;
+        int observerId3 = 3;
+        Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+        List<String> records = Arrays.asList("foo", "bar");
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(snapshotId.epoch(), Arrays.asList("a"))
+                .build();
+
+        int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        FetchSnapshotRequestData voter1FetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                voter1,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        FetchSnapshotRequestData voter2FetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                voter2,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        FetchSnapshotRequestData observerFetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                observerId3,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(snapshotId, 0).get()) {
+            assertEquals(snapshotId, snapshot.snapshotId());
+            snapshot.append(records);
+            snapshot.freeze();
+        }
+
+        // fetch timeout is not expired, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // voter1 sends fetchSnapshotRequest, the fetch timer should be reset
+        context.deliverRequest(voter1FetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // voter2 sends fetchSnapshotRequest, the fetch timer should be reset
+        context.deliverRequest(voter2FetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // An observer sends fetchSnapshotRequest, but the fetch timer should 
not be reset.
+        context.deliverRequest(observerFetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // After this sleep, the fetch timeout should expire since we don't 
receive fetch request from the majority voters within fetchTimeoutMs
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertTrue(context.client.quorum().isResigned());
+    }
+
     @Test
     public void testPartialFetchSnapshotRequestAsLeader() throws Exception {
         int localId = 0;
@@ -1589,6 +1681,7 @@ final public class KafkaRaftClientSnapshotTest {
         context.deliverRequest(
             fetchSnapshotRequest(
                 context.clusterId.toString(),
+                otherNodeId,
                 context.metadataPartition,
                 epoch,
                 new OffsetAndEpoch(0, 0),
@@ -1603,6 +1696,7 @@ final public class KafkaRaftClientSnapshotTest {
         context.deliverRequest(
             fetchSnapshotRequest(
                 null,
+                otherNodeId,
                 context.metadataPartition,
                 epoch,
                 new OffsetAndEpoch(0, 0),
@@ -1617,6 +1711,7 @@ final public class KafkaRaftClientSnapshotTest {
         context.deliverRequest(
             fetchSnapshotRequest(
                 "",
+                otherNodeId,
                 context.metadataPartition,
                 epoch,
                 new OffsetAndEpoch(0, 0),
@@ -1631,6 +1726,7 @@ final public class KafkaRaftClientSnapshotTest {
         context.deliverRequest(
             fetchSnapshotRequest(
                 "invalid-uuid",
+                otherNodeId,
                 context.metadataPartition,
                 epoch,
                 new OffsetAndEpoch(0, 0),
@@ -1756,11 +1852,12 @@ final public class KafkaRaftClientSnapshotTest {
             int maxBytes,
             long position
     ) {
-        return fetchSnapshotRequest(null, topicPartition, epoch, 
offsetAndEpoch, maxBytes, position);
+        return fetchSnapshotRequest(null, -1, topicPartition, epoch, 
offsetAndEpoch, maxBytes, position);
     }
 
     private static FetchSnapshotRequestData fetchSnapshotRequest(
         String clusterId,
+        int replicaId,
         TopicPartition topicPartition,
         int epoch,
         OffsetAndEpoch offsetAndEpoch,
@@ -1773,6 +1870,7 @@ final public class KafkaRaftClientSnapshotTest {
 
         FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
             clusterId,
+            replicaId,
             topicPartition,
             snapshotPartition -> {
                 return snapshotPartition
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index d71874a5336..6d714daa11b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -485,6 +485,60 @@ public class KafkaRaftClientTest {
             context.listener.currentLeaderAndEpoch());
     }
 
+    @Test
+    public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws 
Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        int observerId3 = 3;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+        int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        // fetch timeout is not expired, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+
+        assertFalse(context.client.quorum().isResigned());
+
+        // Received fetch request from a voter, the fetch timer should be 
reset.
+        context.deliverRequest(context.fetchRequest(epoch, remoteId1, 0, 0, 
0));
+        context.pollUntilRequest();
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+
+        assertFalse(context.client.quorum().isResigned());
+
+        // Received fetch request from another voter, the fetch timer should 
be reset.
+        context.deliverRequest(context.fetchRequest(epoch, remoteId2, 0, 0, 
0));
+        context.pollUntilRequest();
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+
+        assertFalse(context.client.quorum().isResigned());
+
+        // Received fetch request from an observer, but the fetch timer should 
not be reset.
+        context.deliverRequest(context.fetchRequest(epoch, observerId3, 0, 0, 
0));
+        context.pollUntilRequest();
+
+        // After this sleep, the fetch timeout should expire since we don't 
receive fetch request from the majority voters within fetchTimeoutMs
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+
+        // The leadership should get resigned now
+        assertTrue(context.client.quorum().isResigned());
+        context.assertResignedLeader(epoch, localId);
+    }
+
     @Test
     public void testElectionTimeoutAfterUserInitiatedResign() throws Exception 
{
         int localId = 0;
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index bb44fea2ac0..97d290fa347 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,20 +46,24 @@ public class LeaderStateTest {
     private final int localId = 0;
     private final int epoch = 5;
     private final LogContext logContext = new LogContext();
-
     private final BatchAccumulator<?> accumulator = 
Mockito.mock(BatchAccumulator.class);
+    private final MockTime time = new MockTime();
+    private final int fetchTimeoutMs = 2000;
+    private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * 
CHECK_QUORUM_TIMEOUT_FACTOR);
 
     private LeaderState<?> newLeaderState(
         Set<Integer> voters,
         long epochStartOffset
     ) {
         return new LeaderState<>(
+            time,
             localId,
             epoch,
             epochStartOffset,
             voters,
             voters,
             accumulator,
+            fetchTimeoutMs,
             logContext
         );
     }
@@ -66,12 +71,14 @@ public class LeaderStateTest {
     @Test
     public void testRequireNonNullAccumulator() {
         assertThrows(NullPointerException.class, () -> new LeaderState<>(
+            new MockTime(),
             localId,
             epoch,
             0,
             Collections.emptySet(),
             Collections.emptySet(),
             null,
+            fetchTimeoutMs,
             logContext
         ));
     }
@@ -447,6 +454,41 @@ public class LeaderStateTest {
             observerState);
     }
 
+    @Test
+    public void testCheckQuorum() {
+        int node1 = 1;
+        int node2 = 2;
+        int node3 = 3;
+        int node4 = 4;
+        int observer5 = 5;
+        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, 
node3, node4), 0L);
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+        int resignLeadershipTimeout = checkQuorumTimeoutMs;
+
+        // checkQuorum timeout not exceeded, should not expire the timer
+        time.sleep(resignLeadershipTimeout / 2);
+        assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0);
+
+        // received fetch requests from 2 voter nodes, the timer should be 
reset
+        state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds());
+        state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds());
+        assertEquals(checkQuorumTimeoutMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // Since the timer was reset, it won't expire this time.
+        time.sleep(resignLeadershipTimeout / 2);
+        long remainingMs = 
state.timeUntilCheckQuorumExpires(time.milliseconds());
+        assertTrue(remainingMs > 0);
+
+        // received fetch requests from 1 voter and 1 observer nodes, the 
timer should not be reset.
+        state.updateCheckQuorumForFollowingVoter(node3, time.milliseconds());
+        state.updateCheckQuorumForFollowingVoter(observer5, 
time.milliseconds());
+        assertEquals(remainingMs, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+
+        // This time, the checkQuorum timer will be expired
+        time.sleep(resignLeadershipTimeout / 2);
+        assertEquals(0, 
state.timeUntilCheckQuorumExpires(time.milliseconds()));
+    }
+
     @Test
     public void testNoOpForNegativeRemoteNodeId() {
         MockTime time = new MockTime();
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 9d798453d0c..68241ae70b1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -78,6 +78,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR;
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -91,6 +92,7 @@ public final class RaftClientTestContext {
     final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
     final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
     final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
+    final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * 
CHECK_QUORUM_TIMEOUT_FACTOR);
     final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
 
     private int electionTimeoutMs;

Reply via email to