showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342676499


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -76,9 +85,37 @@ protected LeaderState(
             boolean hasAcknowledgedLeader = voterId == localId;
             this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
         }
+        this.majority = voters.size() / 2;
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+        this.fetchTimeoutMs = fetchTimeoutMs;
+        this.fetchTimer = time.timer(fetchTimeoutMs);
+    }
+
+    public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
+        fetchTimer.update(currentTimeMs);
+        boolean isExpired = fetchTimer.isExpired();
+        if (isExpired) {
+            log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+                    fetchTimeoutMs, fetchedVoters);
+        }
+        return isExpired;
+    }
+
+    public void maybeResetMajorityFollowerFetchTimeout(int id, long 
currentTimeMs) {
+        updateFetchedVoters(id);
+        if (fetchedVoters.size() >= majority) {
+            fetchedVoters.clear();
+            fetchTimer.update(currentTimeMs);
+            fetchTimer.reset(fetchTimeoutMs);
+        }
+    }
+
+    private void updateFetchedVoters(int id) {
+        if (isVoter(id)) {
+            fetchedVoters.add(id);
+        }

Review Comment:
   > Note that ReplicaState already contains the lastFetchTimestamp.
   
   I'm trying to re-use the `lastFetchTimestamp` in ReplicaState today, but 
found it won't work as expected since the default value for it is `-1`, which 
means, when a note becomes a leader, all the `lastFetchTimestamp` of follower 
nodes are `-1`.  Using current `timer` way is more readable IMO.
   
   > The part that is not clear to me is when or how to wake up the leader for 
a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' 
last fetch time is taken into account when blocking on the messageQueue.poll
   
   Good question. My thought is, we add some buffer to tolerate the operation 
time. Like when [checking 
shrinkISR](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L373-L375),
 we give a 1.5x of the timeout to make things easier, instead of calculating 
the accurate timestamp. So, I'm thinking we use `fetchTimeout * 1.5`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to