jsancio commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1341771602


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -485,6 +485,49 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
             context.listener.currentLeaderAndEpoch());
     }
 
+    @Test
+    public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws 
Exception {

Review Comment:
   Can we also tests the opposite. That the leader doesn't resign if the 
majority of the replicas (including the leader) have fetch in the last 
`fetchTimeoutMs`?
   
   Can we also add test(s) under `KafkaRaftClientSnapshotTest` that show that 
the leader also considers `FETCH_SNAPSHOT` requests for determining network 
connectivity?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -55,6 +57,11 @@ public class LeaderState<T> implements EpochState {
     private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Logger log;
     private final BatchAccumulator<T> accumulator;
+    private final Set<Integer> fetchedVoters = new HashSet<>();
+    private final Timer fetchTimer;
+    private final int fetchTimeoutMs;
+    // The majority number of the voters excluding the leader. Ex: 3 voters, 
the value will be 1
+    private final int majority;

Review Comment:
   Let's not cache this value. Let's always compute this from the size of 
`voterStates`. We can add a function for this.
   
   I am hoping to add support for dynamic voters soon and this is one less 
thing that I would have to change :smile: .



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -965,6 +965,10 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
         }
 
         int replicaId = FetchRequest.replicaId(request);
+
+        Optional<LeaderState<T>> state = quorum.maybeLeaderState();
+        state.ifPresent(s -> 
s.maybeResetMajorityFollowerFetchTimeout(replicaId, currentTimeMs));

Review Comment:
   I am thinking that the leader should only update the timer on successful 
FETCH and FETCH_SNAPSHOT responses. If the majority of the replicas are not 
successfully replicating the log, the leader will not be able commit write 
operations (`RaftClient::scheduleAppend`) from the state machine.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -66,7 +73,9 @@ protected LeaderState(
         Set<Integer> voters,
         Set<Integer> grantingVoters,
         BatchAccumulator<T> accumulator,
-        LogContext logContext
+        LogContext logContext,
+        Time time,
+        int fetchTimeoutMs

Review Comment:
   Let's re-order this so that it better matches `CandidateState` and 
`FollowerState`. How about this order for the parameters:
   ```java
   protected LeaderState(
       Time time,
       int localId,
       int epoch,
       long epochStartOffset,
       Set<Integer> voters,
       Set<Integer> grantingVoters,
       BatchAccumulator<T> accumulator,
       int fetchTimeoutMs,
       LogContext logContext
   )
   ```



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -76,9 +85,37 @@ protected LeaderState(
             boolean hasAcknowledgedLeader = voterId == localId;
             this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
         }
+        this.majority = voters.size() / 2;
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        this.fetchTimeoutMs = fetchTimeoutMs;
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);
+        }
+        return isExpired;
+    }
+
+    public void maybeResetMajorityFollowerFetchTimeout(int id, long 
currentTimeMs) {
+        updateFetchedVoters(id);
+        if (fetchedVoters.size() >= majority) {
+            fetchedVoters.clear();
+            fetchTimer.update(currentTimeMs);
+            fetchTimer.reset(fetchTimeoutMs);
+        }
+    }
+
+    private void updateFetchedVoters(int id) {
+        if (isVoter(id)) {
+            fetchedVoters.add(id);
+        }

Review Comment:
   I see.
   
   I think that the invariant that we need to hold is that for any time span of 
`fetchTimeoutMs` the majority of the replicas have performed a successful 
`FETCH` and `FETCH_SNAPSHOT`. Note that `ReplicaState` already contains the 
`lastFetchTimestamp`.
   
   The part that is not clear to me is when or how to wake up the leader for a 
`poll`. We need to update `KafkaRaftClient::pollLeader` so that the replicas' 
last fetch time is taken into account when blocking on the `messageQueue.poll`.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to