dajac commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r893659548
########## clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java: ########## @@ -93,4 +93,5 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) { return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version)); } + Review Comment: nit: This empty line could be removed. ########## clients/src/main/resources/common/message/DescribeQuorumResponse.json: ########## @@ -17,7 +17,8 @@ "apiKey": 55, "type": "response", "name": "DescribeQuorumResponse", - "validVersions": "0", + // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836) Review Comment: nit: Can we add a dot at the end of the sentence? ########## clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This is used to describe per-partition state in the DescribeQuorumResponse. Review Comment: This comment may have been missed. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { + List<QuorumInfo.ReplicaState> voters = new ArrayList<>(); + List<QuorumInfo.ReplicaState> observers = new ArrayList<>(); + partition.currentVoters().forEach(v -> { + voters.add(new QuorumInfo.ReplicaState(v.replicaId(), + v.logEndOffset(), + v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()), + v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp()))); + }); + partition.observers().forEach(o -> { + observers.add(new QuorumInfo.ReplicaState(o.replicaId(), + o.logEndOffset(), + o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()), + o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp()))); + }); + QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); + return info; + } + + @Override + DescribeQuorumRequest.Builder createRequest(int timeoutMs) { + return new Builder(DescribeQuorumRequest.singletonRequest( + new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition()))); + } + + @Override + void handleResponse(AbstractResponse response) { + final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; + if (quorumResponse.data().errorCode() != Errors.NONE.code()) { + throw Errors.forCode(quorumResponse.data().errorCode()).exception(); Review Comment: Great, thanks. ########## clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This is used to describe per-partition state in the DescribeQuorumResponse. + */ +public class QuorumInfo { + private final Integer leaderId; + private final List<ReplicaState> voters; + private final List<ReplicaState> observers; + + QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) { + this.leaderId = leaderId; + this.voters = voters; + this.observers = observers; + } + + public Integer leaderId() { + return leaderId; + } + + public List<ReplicaState> voters() { + return voters; + } + + public List<ReplicaState> observers() { + return observers; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QuorumInfo that = (QuorumInfo) o; + return leaderId.equals(that.leaderId) + && voters.equals(that.voters) + && observers.equals(that.observers); + } + + @Override + public int hashCode() { + return Objects.hash(leaderId, voters, observers); + } + + @Override + public String toString() { + return "QuorumInfo(" + + "leaderId=" + leaderId + + ", voters=" + voters.toString() + + ", observers=" + observers.toString() + + ')'; + } + + public static class ReplicaState { + private final int replicaId; + private final long logEndOffset; + private final OptionalLong lastFetchTimeMs; + private final OptionalLong lastCaughtUpTimeMs; + + ReplicaState() { + this(0, 0, OptionalLong.empty(), OptionalLong.empty()); + } + + ReplicaState(int replicaId, long logEndOffset, + OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) { Review Comment: nit: The code format seems a bit off here. I think that we would format like this: ``` ReplicaState( int replicaId, long logEndOffset, OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs ) { ``` ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -778,4 +778,43 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + val admin = Admin.create(props) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo().get() + + assertEquals(3000, quorumInfo.leaderId()) Review Comment: nit: We usually omit parenthesis for getters. There are a few other cases in this file. ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -725,4 +725,35 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + val admin = Admin.create(props) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo().get() + + assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic()) + assertEquals(3, quorumInfo.voters.size()) + assertEquals(0, quorumInfo.observers.size()) Review Comment: This comment has not been addressed. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { + List<QuorumInfo.ReplicaState> voters = new ArrayList<>(); + List<QuorumInfo.ReplicaState> observers = new ArrayList<>(); + partition.currentVoters().forEach(v -> { + voters.add(new QuorumInfo.ReplicaState(v.replicaId(), + v.logEndOffset(), + v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()), + v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp()))); + }); + partition.observers().forEach(o -> { + observers.add(new QuorumInfo.ReplicaState(o.replicaId(), + o.logEndOffset(), + o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()), + o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp()))); Review Comment: Yes. We should avoid code duplication. Btw, you could also use the stream api here.: `partition.observers().stream.map(function).collect(Collectors.toList())`. I leave this one up to you. ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.toShort) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala + if (version == 0) { Review Comment: In this case, should we just remove that version check and verify all versions? My understanding is that they should all be the same at the moment. We can update the test when we implement the server side. What do you think? ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.toShort) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala Review Comment: nit: Should we assert the number of voters/observers as well? ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -778,4 +778,43 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + val admin = Admin.create(props) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo().get() + + assertEquals(3000, quorumInfo.leaderId()) + assertEquals(0, quorumInfo.observers.size()) + assertEquals(3, quorumInfo.voters.size()) + quorumInfo.voters().forEach( voter => { Review Comment: nit: We usually format block as follow: `quorumInfo.voters.forEach { voter =>`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org