niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r892884885


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));

Review Comment:
   Do we really think this is worth another method.?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   That is correct understanding. The intent (based on some discussion with 
Jason) is to allow the default handler to manage exceptions returned by the 
server. The `UnknownServerException` that we raise here will end up calling 
`handleFailure` and complete exceptionally.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(0, quorumInfo.leaderId())

Review Comment:
   Done!



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {

Review Comment:
   I think I missed replying to this comment earlier. Apologies about that. I 
am not sure if we can check anything for versions greater than zero at the 
moment. As of this PR we are not setting the fields in the response and so we 
cannot verify that. Was there something you had in mind that we should verify 
for versions > 0?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {
+        voterData.foreach { state =>
+          assertTrue(0 < state.replicaId)

Review Comment:
   yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to