jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406768273
########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); Review Comment: Since this is not set to the fetch timeout maybe we can call this `checkQuorumTimeoutMs` and `checkQuorumTimer`. I am suggesting these names because @ahuang98 uses "check quorum" in the pre-vote KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. Review Comment: Can we make this a Java doc comments. E.g.: ```java /** * Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from * the majority of the voters within fetch timeout. * * @param ... * @return ... */ ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); + Optional<LeaderState<T>> state = quorum.maybeLeaderState(); + state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs)); + Review Comment: Since the check above `leaderValidation.isPresent()` is false, it means that this replica is guarantee to be the leader at this point in time. I prefer if we use `leaderStateOrThrow` instead of `maybeLeaderState` to make this clear to future readers. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { + fetchTimer.update(currentTimeMs); + boolean isExpired = fetchTimer.isExpired(); + if (isExpired) { + log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + fetchTimeoutMs, fetchedVoters); + } + return isExpired; + } + + // Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter + public void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) { + updateFetchedVoters(id); + // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 + int majority = voterStates.size() / 2; + if (fetchedVoters.size() >= majority) { + fetchedVoters.clear(); + fetchTimer.update(currentTimeMs); + fetchTimer.reset(fetchTimeoutMs); + } + } + + private void updateFetchedVoters(int id) { + if (isVoter(id)) { Review Comment: We should be defensive against this getting called with the local replica. Let's throw an `IllegalArgumentException` if `id` is equal to the `localId`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio context.listener.currentLeaderAndEpoch()); } + @Test + public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception { Review Comment: Got it. This test covers all of the cases I was thinking about. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java: ########## @@ -701,6 +702,97 @@ public void testFetchSnapshotRequestAsLeader() throws Exception { assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); } + @Test + public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters() throws Exception { + int localId = 0; + int voter1 = 1; + int voter2 = 2; + int observerId3 = 3; + Set<Integer> voters = Utils.mkSet(localId, voter1, voter2); + OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1); + List<String> records = Arrays.asList("foo", "bar"); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .build(); + + int resignLeadershipTimeout = (int) (context.fetchTimeoutMs * 1.5); + context.becomeLeader(); + int epoch = context.currentEpoch(); + + FetchSnapshotRequestData voter1FetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + voter1, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + FetchSnapshotRequestData voter2FetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + voter2, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + FetchSnapshotRequestData observerFetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + observerId3, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) { + assertEquals(snapshotId, snapshot.snapshotId()); + snapshot.append(records); + snapshot.freeze(); + } + + // fetch timeout is not expired, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // voter1 sends fetchSnapshotRequest, the fetch timer should be reset + context.deliverRequest(voter1FetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // voter2 sends fetchSnapshotRequest, the fetch timer should be reset + context.deliverRequest(voter2FetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // An observer sends fetchSnapshotRequest, but the fetch timer should not be reset. + context.deliverRequest(observerFetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // After this sleep, the fetch timeout should expire since we don't receive fetch request from the majority voters within fetchTimeoutMs + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertTrue(context.client.quorum().isResigned()); Review Comment: Great test. Thank you! ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { + fetchTimer.update(currentTimeMs); + boolean isExpired = fetchTimer.isExpired(); + if (isExpired) { + log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + fetchTimeoutMs, fetchedVoters); + } + return isExpired; + } + + // Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter Review Comment: Same comment here. Can we make this a Java doc comment? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { + fetchTimer.update(currentTimeMs); + boolean isExpired = fetchTimer.isExpired(); + if (isExpired) { + log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + fetchTimeoutMs, fetchedVoters); Review Comment: We use the following indentations in the `raft` module: ```java log.info( "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", fetchTimeoutMs, fetchedVoters ); ``` ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -55,17 +57,22 @@ public class LeaderState<T> implements EpochState { private final Map<Integer, ReplicaState> observerStates = new HashMap<>(); private final Logger log; private final BatchAccumulator<T> accumulator; + private final Set<Integer> fetchedVoters = new HashSet<>(); Review Comment: What do you think about writing a comment explaining this field. For example, this set includes all of the voter followers that `FETCH` or `FETCH_SNAPSHOT` during the current `fetchTimer` interval. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { Review Comment: See my other comment about check quorum and Alyssa pre-vote KIP. If you agree, maybe we should call this `hasCheckQuorumFailed`. ########## clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java: ########## @@ -70,6 +71,7 @@ public static FetchSnapshotRequestData singleton( return new FetchSnapshotRequestData() .setClusterId(clusterId) + .setReplicaId(replicaId) Review Comment: Good catch. Thanks for fixing this! I think this is the best we can do here. This means that FETCH_SNAPSHOT won't count towards check quorum until the voters have been upgraded to this version. ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -447,6 +452,38 @@ public void testDescribeQuorumWithObservers() { observerState); } + @Test + public void testMajorityFollowerFetchTimeoutExpiration() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + int node4 = 4; + int observer5 = 5; + LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L); + assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds())); + int resignLeadershipTimeout = (int) (fetchTimeoutMs * 1.5); Review Comment: This applies to the other tests but I am thinking that we should either 1) make `1.5` a constant and use that constant in all of the tests or 2) add a method that returns the "check quorum" timeout ms and use that in all of the tests. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x fetch timeout to tolerate some network transition time or other IO time. + this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of + // the voters within fetch timeout. + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { + fetchTimer.update(currentTimeMs); + boolean isExpired = fetchTimer.isExpired(); + if (isExpired) { + log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + fetchTimeoutMs, fetchedVoters); + } + return isExpired; + } + + // Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter + public void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) { Review Comment: How about `updateCheckQuorumForFollowingVoter(int id, long currentTimeMs)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org