jsancio commented on code in PR #16637:
URL: https://github.com/apache/kafka/pull/16637#discussion_r1686927915


##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse(
         long highWatermark,
         List<ReplicaState> voterStates,
         List<ReplicaState> observerStates
+    ) {
+        short apiVersion = (short) (kip853Rpc ? 2 : 1);

Review Comment:
   Take a look at `raftResponseVersion` and `describeQuorumRpcVersion`.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -183,164 +181,6 @@ public void testNonMonotonicLocalEndOffsetUpdate() {
         );
     }
 
-    @ParameterizedTest

Review Comment:
   Did you consider replacing these tests instead of completely removing them? 
Why did you decide to remove them?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -301,6 +298,19 @@ public Endpoints leaderEndpoints() {
         return endpoints;
     }
 
+    int localId() {
+        return localReplicaKey.id();
+    }

Review Comment:
   You can use `QuorumState.localIdOrThrow` since KRaft knows that this replica 
is the leader.



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse(
         long highWatermark,
         List<ReplicaState> voterStates,
         List<ReplicaState> observerStates
+    ) {
+        short apiVersion = (short) (kip853Rpc ? 2 : 1);
+        assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, 
voterStates, observerStates, apiVersion, Errors.NONE);
+    }
+
+    void assertSentDescribeQuorumResponse(
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        List<ReplicaState> voterStates,
+        List<ReplicaState> observerStates,
+        short apiVersion,

Review Comment:
   I am sure you can remove this parameter if you look at my other comment 
about parametrizing `withKip853Rpc` and the call to the builder.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2763,78 +2765,396 @@ public void testDescribeQuorumNonLeader(boolean 
withKip853Rpc) throws Exception
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
-        int localId = randomReplicaId();
-        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
-        Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
+    @CsvSource({ "true,0", "true,1", "false,0", "false,1" })
+    public void testDescribeQuorumOld(boolean withKip853Rpc, short apiVersion) 
throws Exception {
+        int localId = 0;
+        ReplicaKey local = replicaKey(localId, withKip853Rpc);
+        ReplicaKey follower1 = replicaKey(1, withKip853Rpc);
+        Set<Integer> voters = Utils.mkSet(localId, follower1.id());
+        VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1));
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, 
local.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
+            .withStaticVoters(voters)
+            .withKip853Rpc(true);

Review Comment:
   Why this is always set to `true`? Don't you wan to set it to 
`withKip853Rpc`? If you do that, I am sure you can remove the `apiVersion` 
parameter.



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse(
         long highWatermark,
         List<ReplicaState> voterStates,
         List<ReplicaState> observerStates
+    ) {
+        short apiVersion = (short) (kip853Rpc ? 2 : 1);
+        assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, 
voterStates, observerStates, apiVersion, Errors.NONE);
+    }
+
+    void assertSentDescribeQuorumResponse(
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        List<ReplicaState> voterStates,
+        List<ReplicaState> observerStates,
+        short apiVersion,
+        Errors error
     ) {
         DescribeQuorumResponseData response = collectDescribeQuorumResponse();
 
         DescribeQuorumResponseData.PartitionData partitionData = new 
DescribeQuorumResponseData.PartitionData()
-            .setErrorCode(Errors.NONE.code())
+            .setErrorCode(error.code())
             .setLeaderId(leaderId)
             .setLeaderEpoch(leaderEpoch)
             .setHighWatermark(highWatermark)
             .setCurrentVoters(voterStates)
             .setObservers(observerStates);
 
-        // KAFKA-16953 will add support for including the node listeners in 
the node collection
-        DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection();
+        if (!error.equals(Errors.NONE)) {
+            partitionData.setErrorMessage(error.message());
+        }
+
+        DescribeQuorumResponseData.NodeCollection nodes = null;
+        if (apiVersion >= 2) {
+            nodes = new 
DescribeQuorumResponseData.NodeCollection(voterStates.size());

Review Comment:
   I think the default is the empty list not the null value. For example, take 
a look at the implementation of `write` in `DescribeQuorumResponseData`:
   ```java
             if (_version >= 2) {
                 _writable.writeUnsignedVarint(nodes.size() + 1);
                 for (Node nodesElement : nodes) {
                     nodesElement.write(_writable, _cache, _version);
                 }
             } else {
                 if (!this.nodes.isEmpty()) {
                     throw new UnsupportedVersionException("Attempted to write 
a non-default nodes at version " + _version);
                 }
             }
   ```
   
   That code assumes that the value is not equals to `null`.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -607,20 +565,27 @@ private void updateVoterAndObserverStates(VoterSet 
lastVoterSet) {
 
         // Move any of the remaining old voters to observerStates
         for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
+            replicaStateEntry.listeners = Optional.empty();
             observerStates.putIfAbsent(replicaStateEntry.replicaKey, 
replicaStateEntry);
         }
     }
 
-    private static class ReplicaState implements Comparable<ReplicaState> {
+    static class ReplicaState implements Comparable<ReplicaState> {
         ReplicaKey replicaKey;
+        Optional<Endpoints> listeners;
         Optional<LogOffsetMetadata> endOffset;
         long lastFetchTimestamp;
         long lastFetchLeaderLogEndOffset;
         long lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
         public ReplicaState(ReplicaKey replicaKey, boolean 
hasAcknowledgedLeader) {
+            this(replicaKey, hasAcknowledgedLeader, Optional.empty());
+        }
+
+        public ReplicaState(ReplicaKey replicaKey, boolean 
hasAcknowledgedLeader, Optional<Endpoints> listeners) {

Review Comment:
   The `Endpoints` type can be empty. Did you consider using 
`Endpoints.empty()` instead of `Optional.empty()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to