dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881292400


##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.QuorumInfo;
+
+/**
+ * The result of {@link 
Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+    private final KafkaFuture<QuorumInfo> quorumInfo;
+
+    public DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {

Review Comment:
   nit: Could we keep it package private?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {

Review Comment:
   nit: We could use `response.getVoterOffsets(topicName, 
partition).forEach((replicaId, logEndOffset) ->`. That makes the code a bit 
more readable in my opinion.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   hm.. `OptionalLong.empty()` seems wrong here. Don't you need to put the last 
fetch time and the last caughtup time?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   Same question here.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new 
DescribeQuorumRequestData()
+                        .setTopics(singletonList(new 
DescribeQuorumRequestData.TopicData()
+                                .setPartitions(singletonList(new 
DescribeQuorumRequestData.PartitionData()
+                                        .setPartitionIndex(0)))
+                                .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
+                    future.complete(createQuorumResult(quorumResponse));
+                } else {
+                    
future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);

Review Comment:
   nit: Why not directly calling `future.completeExceptionally`?



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;

Review Comment:
   Is `org.apache.kafka.common` the right package here? It seems preferable to 
put them in `org.apache.kafka.clients.admin` package.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse 
prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors 
error) {
+        if (error == Errors.NONE) {
+            return new 
DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new 
DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new 
DescribeQuorumResponseData.ReplicaState()))

Review Comment:
   It would be better to fully populate the response here. That would have 
caught the issue that I mentioned earlier.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,9 +17,10 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
+    // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in 
ReplicaState

Review Comment:
   nit: We usually put this before `validVersions`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {

Review Comment:
   nit: Should we use `describeMetadataQuorum`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -80,6 +85,36 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
     assertTrue(leaderState.logEndOffset > 0)
   }
 
+  @ClusterTest
+  def testDescribeQuorumRequestToBrokers() = {

Review Comment:
   Looking at the other tests, this test suite does not seem to be the right 
place for this test. I wonder if `KRaftClusterTest` would be more appropriate 
for instance but I don't feel strong about this.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4871,32 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 
0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE));
+            final KafkaFuture<QuorumInfo> future = 
env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 
0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST));
+            final KafkaFuture<QuorumInfo> future = 
env.adminClient().describeMetadataQuorum().quorumInfo();
+            final ExecutionException e = 
assertThrows(ExecutionException.class, future::get);
+            assertEquals(e.getCause().getClass(), 
Errors.INVALID_REQUEST.exception().getClass());

Review Comment:
   nit: You could use `TestUtils.assertFutureThrows`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);

Review Comment:
   Implementation of `getPartitionLeaderId`, `getVoterOffsets` and 
`getObserverOffsets` always start by looking up the relevant topic/partition in 
the response. Did you consider doing this lookup once and use the result to 
construct the `QuorumInfo`?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse 
prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors 
error) {
+        if (error == Errors.NONE) {
+            return new 
DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new 
DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new 
DescribeQuorumResponseData.ReplicaState()))
+                    );
+        }
+        return new DescribeQuorumResponse(
+                new DescribeQuorumResponseData()
+                .setErrorCode(error.code()));

Review Comment:
   nit: Indentation seems off here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -17,25 +17,30 @@
 package kafka.server
 
 import java.io.IOException
-
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.utils.NotNothing
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
DescribeMetadataQuorumOptions}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, 
DescribeQuorumResponse}
+import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
 
   @ClusterTest(clusterType = Type.ZK)
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {

Review Comment:
   Should we extend `testDescribeQuorum` to verify that new fields are set when 
v1 is used and not set when v0 is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to