jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1406768273


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);

Review Comment:
   Since this is not set to the fetch timeout maybe we can call this 
`checkQuorumTimeoutMs` and `checkQuorumTimer`. I am suggesting these names 
because @ahuang98 uses "check quorum" in the pre-vote KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.

Review Comment:
   Can we make this a Java doc comments. E.g.:
   ```java
       /**
        * Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from
        * the majority of the voters within fetch timeout.
        *
        * @param ...
        * @return ...
        */
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 
         UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
 
+        Optional<LeaderState<T>> state = quorum.maybeLeaderState();
+        state.ifPresent(s -> 
s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs));
+

Review Comment:
   Since the check above `leaderValidation.isPresent()` is false, it means that 
this replica is guarantee to be the leader at this point in time. I prefer if 
we use `leaderStateOrThrow` instead of `maybeLeaderState` to make this clear to 
future readers.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.
+    public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);
+        }
+        return isExpired;
+    }
+
+    // Reset the fetch timer if we've received fetch/fetchSnapshot request 
from the majority of the voter
+    public void maybeResetMajorityFollowerFetchTimer(int id, long 
currentTimeMs) {
+        updateFetchedVoters(id);
+        // The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
+        int majority = voterStates.size() / 2;
+        if (fetchedVoters.size() >= majority) {
+            fetchedVoters.clear();
+            fetchTimer.update(currentTimeMs);
+            fetchTimer.reset(fetchTimeoutMs);
+        }
+    }
+
+    private void updateFetchedVoters(int id) {
+        if (isVoter(id)) {

Review Comment:
   We should be defensive against this getting called with the local replica. 
Let's throw an `IllegalArgumentException` if `id` is equal to the `localId`.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -485,6 +485,49 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
             context.listener.currentLeaderAndEpoch());
     }
 
+    @Test
+    public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws 
Exception {

Review Comment:
   Got it. This test covers all of the cases I was thinking about.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -701,6 +702,97 @@ public void testFetchSnapshotRequestAsLeader() throws 
Exception {
         assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) 
response.unalignedRecords()).buffer());
     }
 
+    @Test
+    public void 
testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters()
 throws Exception {
+        int localId = 0;
+        int voter1 = 1;
+        int voter2 = 2;
+        int observerId3 = 3;
+        Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+        List<String> records = Arrays.asList("foo", "bar");
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(snapshotId.epoch(), Arrays.asList("a"))
+                .build();
+
+        int resignLeadershipTimeout = (int) (context.fetchTimeoutMs * 1.5);
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        FetchSnapshotRequestData voter1FetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                voter1,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        FetchSnapshotRequestData voter2FetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                voter2,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        FetchSnapshotRequestData observerFetchSnapshotRequest = 
fetchSnapshotRequest(
+                context.clusterId.toString(),
+                observerId3,
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                Integer.MAX_VALUE,
+                0
+        );
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(snapshotId, 0).get()) {
+            assertEquals(snapshotId, snapshot.snapshotId());
+            snapshot.append(records);
+            snapshot.freeze();
+        }
+
+        // fetch timeout is not expired, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // voter1 sends fetchSnapshotRequest, the fetch timer should be reset
+        context.deliverRequest(voter1FetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // voter2 sends fetchSnapshotRequest, the fetch timer should be reset
+        context.deliverRequest(voter2FetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // Since the fetch timer is reset, the leader should not get resigned
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertFalse(context.client.quorum().isResigned());
+
+        // An observer sends fetchSnapshotRequest, but the fetch timer should 
not be reset.
+        context.deliverRequest(observerFetchSnapshotRequest);
+        context.client.poll();
+        context.assertSentFetchSnapshotResponse(context.metadataPartition);
+
+        // After this sleep, the fetch timeout should expire since we don't 
receive fetch request from the majority voters within fetchTimeoutMs
+        context.time.sleep(resignLeadershipTimeout / 2);
+        context.client.poll();
+        assertTrue(context.client.quorum().isResigned());

Review Comment:
   Great test. Thank you!



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.
+    public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);
+        }
+        return isExpired;
+    }
+
+    // Reset the fetch timer if we've received fetch/fetchSnapshot request 
from the majority of the voter

Review Comment:
   Same comment here. Can we make this a Java doc comment?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.
+    public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);

Review Comment:
   We use the following indentations in the `raft` module:
   ```java
               log.info(
                   "Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
                   fetchTimeoutMs,
                   fetchedVoters
               );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -55,17 +57,22 @@ public class LeaderState<T> implements EpochState {
     private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Logger log;
     private final BatchAccumulator<T> accumulator;
+    private final Set<Integer> fetchedVoters = new HashSet<>();

Review Comment:
   What do you think about writing a comment explaining this field. For 
example, this set includes all of the voter followers that `FETCH` or 
`FETCH_SNAPSHOT` during the current `fetchTimer` interval.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.
+    public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {

Review Comment:
   See my other comment about check quorum and Alyssa pre-vote KIP. If you 
agree, maybe we should call this `hasCheckQuorumFailed`.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java:
##########
@@ -70,6 +71,7 @@ public static FetchSnapshotRequestData singleton(
 
         return new FetchSnapshotRequestData()
             .setClusterId(clusterId)
+            .setReplicaId(replicaId)

Review Comment:
   Good catch. Thanks for fixing this!
   
   I think this is the best we can do here. This means that FETCH_SNAPSHOT 
won't count towards check quorum until the voters have been upgraded to this 
version.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -447,6 +452,38 @@ public void testDescribeQuorumWithObservers() {
             observerState);
     }
 
+    @Test
+    public void testMajorityFollowerFetchTimeoutExpiration() {
+        int node1 = 1;
+        int node2 = 2;
+        int node3 = 3;
+        int node4 = 4;
+        int observer5 = 5;
+        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, 
node3, node4), 0L);
+        
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
+        int resignLeadershipTimeout = (int) (fetchTimeoutMs * 1.5);

Review Comment:
   This applies to the other tests but I am thinking that we should either 1) 
make `1.5` a constant and use that constant in all of the tests or 2) add a 
method that returns the "check quorum" timeout ms and use that in all of the 
tests.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -79,6 +86,39 @@ protected LeaderState(
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        // use the 1.5x fetch timeout to tolerate some network transition time 
or other IO time.
+        this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
+    // the voters within fetch timeout.
+    public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);
+        }
+        return isExpired;
+    }
+
+    // Reset the fetch timer if we've received fetch/fetchSnapshot request 
from the majority of the voter
+    public void maybeResetMajorityFollowerFetchTimer(int id, long 
currentTimeMs) {

Review Comment:
   How about `updateCheckQuorumForFollowingVoter(int id, long currentTimeMs)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to