dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r893659548


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +93,5 @@ public static DescribeQuorumResponseData 
singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short 
version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new 
ByteBufferAccessor(buffer), version));
     }
+

Review Comment:
   nit: This empty line could be removed.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,8 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in 
ReplicaState (KIP-836)

Review Comment:
   nit: Can we add a dot at the end of the sentence?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   This comment may have been missed.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   Great, thanks.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> 
observers) {
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            ')';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        ReplicaState() {
+            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+        }
+
+        ReplicaState(int replicaId, long logEndOffset,
+                OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) 
{

Review Comment:
   nit: The code format seems a bit off here. I think that we would format like 
this:
   
   ```
   ReplicaState(
       int replicaId,
       long logEndOffset,
       OptionalLong lastFetchTimeMs,
       OptionalLong lastCaughtUpTimeMs
   ) {
   ```



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,43 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(3000, quorumInfo.leaderId())

Review Comment:
   nit: We usually omit parenthesis for getters. There are a few other cases in 
this file.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   This comment has not been addressed.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));

Review Comment:
   Yes. We should avoid code duplication. Btw, you could also use the stream 
api here.: 
`partition.observers().stream.map(function).collect(Collectors.toList())`. I 
leave this one up to you.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {

Review Comment:
   In this case, should we just remove that version check and verify all 
versions? My understanding is that they should all be the same at the moment. 
We can update the test when we implement the server side. What do you think?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala

Review Comment:
   nit: Should we assert the number of voters/observers as well?



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,43 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(3000, quorumInfo.leaderId())
+        assertEquals(0, quorumInfo.observers.size())
+        assertEquals(3, quorumInfo.voters.size())
+        quorumInfo.voters().forEach( voter => {

Review Comment:
   nit: We usually format block as follow: `quorumInfo.voters.forEach { voter 
=>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to