showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1342676499
########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } + this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + this.fetchTimeoutMs = fetchTimeoutMs; + this.fetchTimer = time.timer(fetchTimeoutMs); + } + + public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { + fetchTimer.update(currentTimeMs); + boolean isExpired = fetchTimer.isExpired(); + if (isExpired) { + log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + fetchTimeoutMs, fetchedVoters); + } + return isExpired; + } + + public void maybeResetMajorityFollowerFetchTimeout(int id, long currentTimeMs) { + updateFetchedVoters(id); + if (fetchedVoters.size() >= majority) { + fetchedVoters.clear(); + fetchTimer.update(currentTimeMs); + fetchTimer.reset(fetchTimeoutMs); + } + } + + private void updateFetchedVoters(int id) { + if (isVoter(id)) { + fetchedVoters.add(id); + } Review Comment: > Note that ReplicaState already contains the lastFetchTimestamp. I'm trying to re-use the `lastFetchTimestamp` in ReplicaState today, but found it won't work as expected since the default value for it is `-1`, which means, when a note becomes a leader, all the `lastFetchTimestamp` of follower nodes are `-1`. Using current `timer` way is more readable IMO. > The part that is not clear to me is when or how to wake up the leader for a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' last fetch time is taken into account when blocking on the messageQueue.poll Good question. My thought is, we add some buffer to tolerate the operation time. Like when [checking shrinkISR](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L373-L375), we give a 1.5x of the timeout to make things easier, instead of calculating the accurate timestamp. So, I'm thinking we use `fetchTimeout * 1.5`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org