jsancio commented on code in PR #16399:
URL: https://github.com/apache/kafka/pull/16399#discussion_r1651079179


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2096,6 +2096,26 @@ private long maybeAppendBatches(
         return timeUntilDrain;
     }
 
+    private long maybeSendBeginQuorumEpochRequests(
+        LeaderState<T> state,
+        long currentTimeMs
+    ) {
+        long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
+        if (timeUntilNextBeginQuorumSend == 0) {
+            timeUntilNextBeginQuorumSend = maybeSendRequests(
+                currentTimeMs,
+                partitionState
+                    .lastVoterSet()
+                    .voterNodes(state.votersExcludingLeader().stream(), 
channel.listenerName()),

Review Comment:
   The latest voters are in `partitionState.lastVoterSet()`. It is odd that 
`KafkaRaftClient` needs to query `LeaderState` to exclude itself from the 
`VoterSet`. How about `voters.voterKeys().stream().filter(!self)`?



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -628,7 +629,7 @@ void deliverResponse(int correlationId, Node source, 
ApiMessage response) {
     RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int 
numBeginEpochRequests) {
         List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch);
         assertEquals(numBeginEpochRequests, requests.size());
-        return requests.get(0);
+        return !requests.isEmpty() ? requests.get(0) : null;

Review Comment:
   Let's avoid returning `null` from "public" methods. We normally don't test 
the absence of a side effect because those checks are technically infinite. 
Maybe we can expose `collectBeginEpochRequests` as package private and check 
the size of the list.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -497,6 +497,32 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
             context.listener.currentLeaderAndEpoch());
     }
 
+    @Test
+    public void testBeginQuorumHeartbeat() throws Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        // begin epoch requests should be sent out every 
beginQuorumEpochTimeoutMs
+        context.time.sleep(context.beginQuorumEpochTimeoutMs);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), 2);

Review Comment:
   Does this method check that it was sent to `remoteId1` and `remoteId2`? If 
not, we should check that begin quorum epoch request was sent to all of the 
remote voters.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2096,6 +2096,26 @@ private long maybeAppendBatches(
         return timeUntilDrain;
     }
 
+    private long maybeSendBeginQuorumEpochRequests(
+        LeaderState<T> state,
+        long currentTimeMs
+    ) {
+        long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
+        if (timeUntilNextBeginQuorumSend == 0) {
+            timeUntilNextBeginQuorumSend = maybeSendRequests(
+                currentTimeMs,
+                partitionState
+                    .lastVoterSet()
+                    .voterNodes(state.votersExcludingLeader().stream(), 
channel.listenerName()),
+                this::buildBeginQuorumEpochRequest
+            );
+            state.resetBeginQuorumEpochTimer(currentTimeMs);
+            logger.trace("Attempted to send BeginQuorumEpochRequest as 
heartbeat to all voters. " +
+                "Request can be retried in {} ms", 
timeUntilNextBeginQuorumSend);

Review Comment:
   This doesn't match the formatting used in this file or module:
   ```java
               logger.trace(
                   "Attempted to send BeginQuorumEpochRequest as heartbeat to 
all voters. " +
                   "Request can be retried in {} ms",
                   timeUntilNextBeginQuorumSend
               );
   ```



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -866,6 +872,27 @@ public void testGrantVote(boolean isLogUpToDate) {
         assertFalse(state.canGrantVote(ReplicaKey.of(3, Optional.empty()), 
isLogUpToDate));
     }
 
+    @Test
+    public void testBeginQuorumEpochTimer() {
+        int leader = localId;
+        int follower1 = 1;
+        long epochStartOffset = 10L;
+
+        Set<Integer> voterSet = mkSet(leader, follower1);
+        LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
+        assertEquals(0, 
state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds()));
+
+        long resetTime = time.milliseconds();

Review Comment:
   `time.milliseconds` is mocked so it will return the same value as long as 
you don't call `time.sleep(long)`.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -220,7 +234,14 @@ public Uuid localDirectoryId() {
         return localDirectoryId;
     }
 
-    public Set<Integer> nonAcknowledgingVoters() {
+    public Set<Integer> votersExcludingLeader() {
+        Set<Integer> voters = new HashSet<>(voterStates.keySet());
+        voters.remove(localId);
+        return voters;
+    }

Review Comment:
   See my other comment but should be able to remove this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to