jsancio commented on code in PR #16637: URL: https://github.com/apache/kafka/pull/16637#discussion_r1686927915
########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse( long highWatermark, List<ReplicaState> voterStates, List<ReplicaState> observerStates + ) { + short apiVersion = (short) (kip853Rpc ? 2 : 1); Review Comment: Take a look at `raftResponseVersion` and `describeQuorumRpcVersion`. ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -183,164 +181,6 @@ public void testNonMonotonicLocalEndOffsetUpdate() { ); } - @ParameterizedTest Review Comment: Did you consider replacing these tests instead of completely removing them? Why did you decide to remove them? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -301,6 +298,19 @@ public Endpoints leaderEndpoints() { return endpoints; } + int localId() { + return localReplicaKey.id(); + } Review Comment: You can use `QuorumState.localIdOrThrow` since KRaft knows that this replica is the leader. ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse( long highWatermark, List<ReplicaState> voterStates, List<ReplicaState> observerStates + ) { + short apiVersion = (short) (kip853Rpc ? 2 : 1); + assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, voterStates, observerStates, apiVersion, Errors.NONE); + } + + void assertSentDescribeQuorumResponse( + int leaderId, + int leaderEpoch, + long highWatermark, + List<ReplicaState> voterStates, + List<ReplicaState> observerStates, + short apiVersion, Review Comment: I am sure you can remove this parameter if you look at my other comment about parametrizing `withKip853Rpc` and the call to the builder. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2763,78 +2765,396 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception } @ParameterizedTest - @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); + @CsvSource({ "true,0", "true,1", "false,0", "false,1" }) + public void testDescribeQuorumOld(boolean withKip853Rpc, short apiVersion) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, withKip853Rpc); + ReplicaKey follower1 = replicaKey(1, withKip853Rpc); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, local.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .withStaticVoters(voters) + .withKip853Rpc(true); Review Comment: Why this is always set to `true`? Don't you wan to set it to `withKip853Rpc`? If you do that, I am sure you can remove the `apiVersion` parameter. ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -641,19 +641,43 @@ void assertSentDescribeQuorumResponse( long highWatermark, List<ReplicaState> voterStates, List<ReplicaState> observerStates + ) { + short apiVersion = (short) (kip853Rpc ? 2 : 1); + assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, voterStates, observerStates, apiVersion, Errors.NONE); + } + + void assertSentDescribeQuorumResponse( + int leaderId, + int leaderEpoch, + long highWatermark, + List<ReplicaState> voterStates, + List<ReplicaState> observerStates, + short apiVersion, + Errors error ) { DescribeQuorumResponseData response = collectDescribeQuorumResponse(); DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData() - .setErrorCode(Errors.NONE.code()) + .setErrorCode(error.code()) .setLeaderId(leaderId) .setLeaderEpoch(leaderEpoch) .setHighWatermark(highWatermark) .setCurrentVoters(voterStates) .setObservers(observerStates); - // KAFKA-16953 will add support for including the node listeners in the node collection - DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(); + if (!error.equals(Errors.NONE)) { + partitionData.setErrorMessage(error.message()); + } + + DescribeQuorumResponseData.NodeCollection nodes = null; + if (apiVersion >= 2) { + nodes = new DescribeQuorumResponseData.NodeCollection(voterStates.size()); Review Comment: I think the default is the empty list not the null value. For example, take a look at the implementation of `write` in `DescribeQuorumResponseData`: ```java if (_version >= 2) { _writable.writeUnsignedVarint(nodes.size() + 1); for (Node nodesElement : nodes) { nodesElement.write(_writable, _cache, _version); } } else { if (!this.nodes.isEmpty()) { throw new UnsupportedVersionException("Attempted to write a non-default nodes at version " + _version); } } ``` That code assumes that the value is not equals to `null`. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -607,20 +565,27 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { + replicaStateEntry.listeners = Optional.empty(); observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry); } } - private static class ReplicaState implements Comparable<ReplicaState> { + static class ReplicaState implements Comparable<ReplicaState> { ReplicaKey replicaKey; + Optional<Endpoints> listeners; Optional<LogOffsetMetadata> endOffset; long lastFetchTimestamp; long lastFetchLeaderLogEndOffset; long lastCaughtUpTimestamp; boolean hasAcknowledgedLeader; public ReplicaState(ReplicaKey replicaKey, boolean hasAcknowledgedLeader) { + this(replicaKey, hasAcknowledgedLeader, Optional.empty()); + } + + public ReplicaState(ReplicaKey replicaKey, boolean hasAcknowledgedLeader, Optional<Endpoints> listeners) { Review Comment: The `Endpoints` type can be empty. Did you consider using `Endpoints.empty()` instead of `Optional.empty()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org