hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r885096122


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to 
ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)

Review Comment:
   nit: these are all misaligned



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to 
ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {
+          voterData.foreach( state => {

Review Comment:
   nit: the idiomatic way to write this is 
   ```scala
   voterData.foreach { state =>
     ...
   }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4878,35 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE));
+            final KafkaFuture<QuorumInfo> future = 
env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {

Review Comment:
   Could we have a test case with a partition-level error?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -26,16 +25,19 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, 
DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumRequestTest])

Review Comment:
   Seems unused?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to 
ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId 
== leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among 
current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {
+          voterData.foreach( state => {
+            assertTrue(state.lastFetchTimestamp() == -1)

Review Comment:
   nit: we should use `assertEquals`. The advantage is that we can see what the 
actual value was in the failure message, which is sometimes useful to 
understand the failure.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

Review Comment:
   nit: it doesn't look like `timeoutMs` is used? Also, could we use 
`DescribeQuorumRequest.singletonRequest`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to 
ApiKeys.DESCRIBE_QUORUM.latestVersion) {

Review Comment:
   How about this?
   ```scala
   for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);

Review Comment:
   @niket-goel any comment here? I think this part still reads awkward. 
Converting to an intermediate map is conventional. An alternative would be to 
do a quick validation of the response. We can structure the checks like this:
   
   1. Check top-level error code
   2. Verify only one topic in the response which matches metadata topic
   3. Verify only one partition in the response with id 0
   4. Check partition-level error code.
   
   This is similar to how we handle the request in 
`KafkaRaftClient.handleDescribeQuorumRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to