jsancio commented on code in PR #20318:
URL: https://github.com/apache/kafka/pull/20318#discussion_r2451963219


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -188,6 +189,39 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) 
{
         beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
     }
 
+    /**
+     * Identifies voter replicas that require a {@code BeginQuorumEpoch} 
request from the leader.
+     * <p>
+     * The leader periodically checks whether any voter replicas have fallen 
behind in fetching
+     * from it. This method evaluates all known voter replica states and 
determines which replicas
+     * should receive a {@code BeginQuorumEpoch} request.
+     * <p>
+     * A replica will be included in the returned set if:
+     * <ul>
+     *   <li>It is not the local voter (the leader itself), and</li>
+     *   <li>The time since its last fetch is greater than or equal to
+     *       {@code beginQuorumEpochTimeoutMs}.</li>
+     * </ul>
+     * <p>
+     * The {@link #beginQuorumEpochTimer} is updated with the current time 
before performing the check.
+     * The returned set is unmodifiable to prevent external modification.
+     *
+     * @param currentTimeMs the current system time in milliseconds
+     * @return an unmodifiable set of {@link ReplicaKey} objects representing 
replicas
+     *         that need to receive a {@code BeginQuorumEpoch} request
+     */
+    public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+        Set<ReplicaKey> replicaKeys = new HashSet<>();
+        beginQuorumEpochTimer.update(currentTimeMs);
+        for (ReplicaState state : voterStates.values()) {
+            if (state.replicaKey.id() != localVoterNode.voterKey().id()
+                && currentTimeMs - state.lastFetchTimestamp >= 
beginQuorumEpochTimeoutMs) {
+                replicaKeys.add(state.replicaKey());
+            }
+        }
+        return Collections.unmodifiableSet(replicaKeys);

Review Comment:
   How about:
   ```java
   return voterStates.values()
       .stream()
       .filter(
           state -> state.replicaKey.id() != localVoterNode.voterKey().id() &&
           currentTimeMs - state.lastFetchTimestamp >= beginQuorumEpochTimeoutM
       )
       .map(ReplicaState::replicaKey)
       .collect(Collectors.toUnmodifiableSet());
   ```



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -188,6 +189,39 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) 
{
         beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
     }
 
+    /**
+     * Identifies voter replicas that require a {@code BeginQuorumEpoch} 
request from the leader.
+     * <p>
+     * The leader periodically checks whether any voter replicas have fallen 
behind in fetching
+     * from it. This method evaluates all known voter replica states and 
determines which replicas
+     * should receive a {@code BeginQuorumEpoch} request.
+     * <p>
+     * A replica will be included in the returned set if:
+     * <ul>
+     *   <li>It is not the local voter (the leader itself), and</li>
+     *   <li>The time since its last fetch is greater than or equal to
+     *       {@code beginQuorumEpochTimeoutMs}.</li>
+     * </ul>
+     * <p>
+     * The {@link #beginQuorumEpochTimer} is updated with the current time 
before performing the check.
+     * The returned set is unmodifiable to prevent external modification.
+     *
+     * @param currentTimeMs the current system time in milliseconds
+     * @return an unmodifiable set of {@link ReplicaKey} objects representing 
replicas
+     *         that need to receive a {@code BeginQuorumEpoch} request
+     */
+    public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+        Set<ReplicaKey> replicaKeys = new HashSet<>();
+        beginQuorumEpochTimer.update(currentTimeMs);

Review Comment:
   Do you need to update the timer? For example, the timer is never evaluated 
by this method. In this implementation the timer is updated and evaluated 
before calling this method 
[here](https://github.com/apache/kafka/pull/20318/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R3107):
 `long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to