jsancio commented on code in PR #20318:
URL: https://github.com/apache/kafka/pull/20318#discussion_r2451963219
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -188,6 +189,39 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs)
{
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
}
+ /**
+ * Identifies voter replicas that require a {@code BeginQuorumEpoch}
request from the leader.
+ * <p>
+ * The leader periodically checks whether any voter replicas have fallen
behind in fetching
+ * from it. This method evaluates all known voter replica states and
determines which replicas
+ * should receive a {@code BeginQuorumEpoch} request.
+ * <p>
+ * A replica will be included in the returned set if:
+ * <ul>
+ * <li>It is not the local voter (the leader itself), and</li>
+ * <li>The time since its last fetch is greater than or equal to
+ * {@code beginQuorumEpochTimeoutMs}.</li>
+ * </ul>
+ * <p>
+ * The {@link #beginQuorumEpochTimer} is updated with the current time
before performing the check.
+ * The returned set is unmodifiable to prevent external modification.
+ *
+ * @param currentTimeMs the current system time in milliseconds
+ * @return an unmodifiable set of {@link ReplicaKey} objects representing
replicas
+ * that need to receive a {@code BeginQuorumEpoch} request
+ */
+ public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+ Set<ReplicaKey> replicaKeys = new HashSet<>();
+ beginQuorumEpochTimer.update(currentTimeMs);
+ for (ReplicaState state : voterStates.values()) {
+ if (state.replicaKey.id() != localVoterNode.voterKey().id()
+ && currentTimeMs - state.lastFetchTimestamp >=
beginQuorumEpochTimeoutMs) {
+ replicaKeys.add(state.replicaKey());
+ }
+ }
+ return Collections.unmodifiableSet(replicaKeys);
Review Comment:
How about:
```java
return voterStates.values()
.stream()
.filter(
state -> state.replicaKey.id() != localVoterNode.voterKey().id() &&
currentTimeMs - state.lastFetchTimestamp >= beginQuorumEpochTimeoutM
)
.map(ReplicaState::replicaKey)
.collect(Collectors.toUnmodifiableSet());
```
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -188,6 +189,39 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs)
{
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
}
+ /**
+ * Identifies voter replicas that require a {@code BeginQuorumEpoch}
request from the leader.
+ * <p>
+ * The leader periodically checks whether any voter replicas have fallen
behind in fetching
+ * from it. This method evaluates all known voter replica states and
determines which replicas
+ * should receive a {@code BeginQuorumEpoch} request.
+ * <p>
+ * A replica will be included in the returned set if:
+ * <ul>
+ * <li>It is not the local voter (the leader itself), and</li>
+ * <li>The time since its last fetch is greater than or equal to
+ * {@code beginQuorumEpochTimeoutMs}.</li>
+ * </ul>
+ * <p>
+ * The {@link #beginQuorumEpochTimer} is updated with the current time
before performing the check.
+ * The returned set is unmodifiable to prevent external modification.
+ *
+ * @param currentTimeMs the current system time in milliseconds
+ * @return an unmodifiable set of {@link ReplicaKey} objects representing
replicas
+ * that need to receive a {@code BeginQuorumEpoch} request
+ */
+ public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+ Set<ReplicaKey> replicaKeys = new HashSet<>();
+ beginQuorumEpochTimer.update(currentTimeMs);
Review Comment:
Do you need to update the timer? For example, the timer is never evaluated
by this method. In this implementation the timer is updated and evaluated
before calling this method
[here](https://github.com/apache/kafka/pull/20318/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R3107):
`long timeUntilNextBeginQuorumSend =
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]