jsancio commented on code in PR #16399: URL: https://github.com/apache/kafka/pull/16399#discussion_r1651079179
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2096,6 +2096,26 @@ private long maybeAppendBatches( return timeUntilDrain; } + private long maybeSendBeginQuorumEpochRequests( + LeaderState<T> state, + long currentTimeMs + ) { + long timeUntilNextBeginQuorumSend = state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs); + if (timeUntilNextBeginQuorumSend == 0) { + timeUntilNextBeginQuorumSend = maybeSendRequests( + currentTimeMs, + partitionState + .lastVoterSet() + .voterNodes(state.votersExcludingLeader().stream(), channel.listenerName()), Review Comment: The latest voters are in `partitionState.lastVoterSet()`. It is odd that `KafkaRaftClient` needs to query `LeaderState` to exclude itself from the `VoterSet`. How about `voters.voterKeys().stream().filter(!self)`? ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -628,7 +629,7 @@ void deliverResponse(int correlationId, Node source, ApiMessage response) { RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int numBeginEpochRequests) { List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch); assertEquals(numBeginEpochRequests, requests.size()); - return requests.get(0); + return !requests.isEmpty() ? requests.get(0) : null; Review Comment: Let's avoid returning `null` from "public" methods. We normally don't test the absence of a side effect because those checks are technically infinite. Maybe we can expose `collectBeginEpochRequests` as package private and check the size of the list. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -497,6 +497,32 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio context.listener.currentLeaderAndEpoch()); } + @Test + public void testBeginQuorumHeartbeat() throws Exception { + int localId = 0; + int remoteId1 = 1; + int remoteId2 = 2; + Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // begin epoch requests should be sent out every beginQuorumEpochTimeoutMs + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), 2); Review Comment: Does this method check that it was sent to `remoteId1` and `remoteId2`? If not, we should check that begin quorum epoch request was sent to all of the remote voters. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2096,6 +2096,26 @@ private long maybeAppendBatches( return timeUntilDrain; } + private long maybeSendBeginQuorumEpochRequests( + LeaderState<T> state, + long currentTimeMs + ) { + long timeUntilNextBeginQuorumSend = state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs); + if (timeUntilNextBeginQuorumSend == 0) { + timeUntilNextBeginQuorumSend = maybeSendRequests( + currentTimeMs, + partitionState + .lastVoterSet() + .voterNodes(state.votersExcludingLeader().stream(), channel.listenerName()), + this::buildBeginQuorumEpochRequest + ); + state.resetBeginQuorumEpochTimer(currentTimeMs); + logger.trace("Attempted to send BeginQuorumEpochRequest as heartbeat to all voters. " + + "Request can be retried in {} ms", timeUntilNextBeginQuorumSend); Review Comment: This doesn't match the formatting used in this file or module: ```java logger.trace( "Attempted to send BeginQuorumEpochRequest as heartbeat to all voters. " + "Request can be retried in {} ms", timeUntilNextBeginQuorumSend ); ``` ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -866,6 +872,27 @@ public void testGrantVote(boolean isLogUpToDate) { assertFalse(state.canGrantVote(ReplicaKey.of(3, Optional.empty()), isLogUpToDate)); } + @Test + public void testBeginQuorumEpochTimer() { + int leader = localId; + int follower1 = 1; + long epochStartOffset = 10L; + + Set<Integer> voterSet = mkSet(leader, follower1); + LeaderState<?> state = newLeaderState(voterSet, epochStartOffset); + assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); + + long resetTime = time.milliseconds(); Review Comment: `time.milliseconds` is mocked so it will return the same value as long as you don't call `time.sleep(long)`. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -220,7 +234,14 @@ public Uuid localDirectoryId() { return localDirectoryId; } - public Set<Integer> nonAcknowledgingVoters() { + public Set<Integer> votersExcludingLeader() { + Set<Integer> voters = new HashSet<>(voterStates.keySet()); + voters.remove(localId); + return voters; + } Review Comment: See my other comment but should be able to remove this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org