hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r880984176


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4181,6 +4193,61 @@ private static byte[] getSaltedPasword(ScramMechanism 
publicScramMechanism, byte
                 .hi(password, salt, iterations);
     }
 
+    @Override
+    public DescribeQuorumResult describeQuorum(DescribeQuorumOptions options) {
+      NodeProvider provider = new LeastLoadedNodeProvider();
+
+      final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+      final long now = time.milliseconds();
+        final Call call = new Call(
+            "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), 
entry.getValue()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), 
entry.getValue()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+               return info;

Review Comment:
   nit: this looks misaligned



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {

Review Comment:
   Would it make sense to implement `equals` and `hashCode`? That is often 
useful for testing. Also it would be nice to have a good `toString` 
implementations.



##########
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##########
@@ -256,6 +256,21 @@ public static List<Node> 
voterConnectionsToNodes(Map<Integer, RaftConfig.Address
             .collect(Collectors.toList());
     }
 
+    public static String nodesToVoterConnections(List<Node> nodes) {

Review Comment:
   Seems like we're not using this anywhere. Included by mistake perhaps?



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1382,6 +1382,23 @@ default DescribeFeaturesResult describeFeatures() {
         return describeFeatures(new DescribeFeaturesOptions());
     }
 
+    /**
+     * Describe the state of the raft quorum

Review Comment:
   How about "metadata quorum" or "kraft quorum"? Also nit: missing period.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -318,6 +325,11 @@ public class KafkaAdminClient extends AdminClient {
     private final Logger log;
     private final LogContext logContext;
 
+    /**
+     * The name of the internal raft metadata topic
+     */
+    private static final String METADATA_TOPIC_NAME = "__cluster_metadata";

Review Comment:
   Hmm, I hadn't really been thinking about the fact that we would have to 
expose this to the client. I guess that is the consequence of having such a 
general `DescribeQuorum` API. This makes me wonder if we ought to be more 
forward looking with the naming here. Suppose that we ultimately decide to use 
raft for partition replication as well. Then we might want to be able to use 
`DescribeQuorum` for user partitions as well, but we haven't given ourselves a 
lot of room for extension in the `describeQuorum` API. Would it make sense to 
   ```java
   public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options)
   ```
   It is more verbose, but it is also clearer.
   
   We should also move this constant to 
`org.apache.kafka.common.internals.Topic`.



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, 
List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = -1;
+      this.lastCaughtUpTimeMs = -1;
+    }
+
+    public ReplicaState(int replicaId, long logEndOffset,
+        long lastFetchTimeMs, long lastCaughtUpTimeMs) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = lastFetchTimeMs;
+      this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
+    }
+
+    public int replicaId() {
+      return replicaId;
+    }
+
+    public long logEndOffset() {
+      return logEndOffset;
+    }
+
+    public long lastFetchTimeMs() {

Review Comment:
   Can we document the result for older versions of `DescribeQuorum`? I am 
tempted to suggest that we change the type to `OptionalLong` to make it clear 
that the value may be absent for older versions.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,7 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Usually we add some comments to describe what has changed in each version. 
You can see the other request specs for examples.



##########
core/src/test/scala/integration/kafka/admin/DescribeQuorumTest.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.admin
+
+import kafka.server.KafkaRaftServer
+import kafka.testkit.KafkaClusterTestKit
+import kafka.testkit.TestKitNodes
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
DescribeQuorumOptions}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Test, Timeout}
+import org.slf4j.LoggerFactory
+
+
+@Timeout(120)
+@Tag("integration")
+class DescribeQuorumTest {

Review Comment:
   We have another class `DescribeQuorumRequesetTest`. Would it make sense to 
move this test there? Perhaps we could generalize it 
`DescribeQuorumIntegrationTest` or something like that.



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, 
List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {

Review Comment:
   Do we need to expose this?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4181,6 +4193,61 @@ private static byte[] getSaltedPasword(ScramMechanism 
publicScramMechanism, byte
                 .hi(password, salt, iterations);
     }
 
+    @Override
+    public DescribeQuorumResult describeQuorum(DescribeQuorumOptions options) {
+      NodeProvider provider = new LeastLoadedNodeProvider();
+
+      final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+      final long now = time.milliseconds();
+        final Call call = new Call(
+            "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), 
entry.getValue()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: 
response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), 
entry.getValue()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+               return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new 
DescribeQuorumRequestData()
+                    .setTopics(singletonList(new 
DescribeQuorumRequestData.TopicData()
+                        .setPartitions(singletonList(new 
DescribeQuorumRequestData.PartitionData()
+                            .setPartitionIndex(0)))
+                        .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
+                    future.complete(createQuorumResult(quorumResponse));
+                } else {
+                    
future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());

Review Comment:
   It would be helpful to have a couple unit tests in `KafkaAdminClientTest` 
covering failure and success cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to