[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-05-25 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881292400


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.QuorumInfo;
+
+/**
+ * The result of {@link 
Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+private final KafkaFuture quorumInfo;
+
+public DescribeMetadataQuorumResult(KafkaFuture quorumInfo) {

Review Comment:
   nit: Could we keep it package private?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(partition);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+for (Map.Entry entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {

Review Comment:
   nit: We could use `response.getVoterOffsets(topicName, 
partition).forEach((replicaId, logEndOffset) ->`. That makes the code a bit 
more readable in my opinion.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(partition);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+for (Map.Entry entry: 
response.getVoterOffsets(topicName, partition).entrySet()) {
+voters.add(new ReplicaState(entry.getKey(), 
entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   hm.. `OptionalLong.empty()` seems wrong here. Don't you need to put the last 
fetch time and the last caughtup time?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeQuorum", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Intege

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r888913198


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
  */
 UpdateFeaturesResult updateFeatures(Map 
featureUpdates, UpdateFeaturesOptions options);
 
+/**
+ * Describes the state of the metadata quorum.
+ * 
+ * This is a convenience method for {@link 
#describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.

Review Comment:
   nit: There is an extra space before `with`.



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link 
Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+private final KafkaFuture quorumInfo;
+
+DescribeMetadataQuorumResult(KafkaFuture quorumInfo) {
+this.quorumInfo = quorumInfo;
+}
+
+/**
+ * Returns a future QuorumInfo

Review Comment:
   nit: Should we say `Returns a future containing the quorum info.`?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumOptions.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link 
ConfluentAdmin#describeQuorum(DescribeMetadataQuorumOptions)}.
+ *

Review Comment:
   nit: We could remove this empty line.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(0);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+response.getVoterInfo(topicName, partition).forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+response.getObserverInfo(topicName, partition).forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+  

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890984307


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumSuccess() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   Ack. I was not aware of this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890986175


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuoru
 final Call call = new Call(
 "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
 
-private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
-Integer partition = 0;
-String topicName = response.getTopicNameByIndex(0);
-Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
 List voters = new ArrayList<>();
 List observers = new ArrayList<>();
-response.getVoterInfo(topicName, partition).forEach(v -> {
+partition.currentVoters().forEach(v -> {
 voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
 v.logEndOffset(),
 OptionalLong.of(v.lastFetchTimestamp()),
 OptionalLong.of(v.lastCaughtUpTimestamp(;

Review Comment:
   When `lastFetchTimestamp` or `lastCaughtUpTimestamp` are not provided 
(equals to -1), don't we want to return an empty option instead of returning an 
option containing -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-08 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r892495891


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -778,4 +778,35 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format
+  cluster.startup
+  for (i <- 0 to 3) {
+TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+  "Broker Never started up")
+  }
+  val props = cluster.clientProperties()
+  props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+  val admin = Admin.create(props)
+  try {
+val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+val quorumInfo = quorumState.quorumInfo().get()
+
+assertEquals(0, quorumInfo.leaderId())

Review Comment:
   Should we check all the data in quorum info in this integration test? Just 
checking the leader and the number of voters/observers seems weak to me.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp(;

Review Comment:
   Could we extract this bloc into an helper method and share it with the 
voters part?



##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-val request = new DescribeQuorumRequest.Builder(
-  singletonRequest(KafkaRaftServer.MetadataPartition)
-).build()
-
-val response = connectAndReceive[DescribeQuorumResponse](request)
-
-assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-assertEquals(1, response.data.topics.size)
-
-val topicData = response.data.topics.get(0)
-assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-assertEquals(1, topicData.partitions.size)
-
-val partitionData = topicData.partitions.get(0)
-assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
-assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-assertTrue(partitionData.leaderEpoch > 0)
-
-val leaderId = partitionData.leaderId
-assertTrue(leaderId > 0)
-
-val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == 
leaderId)
-  .getOrElse(throw new AssertionError("Failed to find leader among current 
voter states"))
-assertTrue(leaderState.logEndOffset > 0)
+for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+  val request = new DescribeQuorumRequest.Builder(
+singletonRequest(KafkaRaftServer.MetadataPartition)
+  ).build(version.toShort)
+  val response = connectAndReceive[DescribeQuorumResponse](request)
+
+  assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+  assertEquals(1, response.data.topics.size)
+
+  val topicData = response.data.topics.get(0)
+  assertEquals(KafkaRaftServer.Met

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-09 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r893659548


##
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##
@@ -93,4 +93,5 @@ public static DescribeQuorumResponseData 
singletonResponse(TopicPartition topicP
 public static DescribeQuorumResponse parse(ByteBuffer buffer, short 
version) {
 return new DescribeQuorumResponse(new DescribeQuorumResponseData(new 
ByteBufferAccessor(buffer), version));
 }
+

Review Comment:
   nit: This empty line could be removed.



##
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##
@@ -17,7 +17,8 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in 
ReplicaState (KIP-836)

Review Comment:
   nit: Can we add a dot at the end of the sentence?



##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   This comment may have been missed.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   Great, thanks.



##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * c

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-13 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r895790891


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to describe the state of the quorum received in 
DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+private final Integer leaderId;
+private final List voters;
+private final List observers;
+
+QuorumInfo(Integer leaderId, List voters, List 
observers) {
+this.leaderId = leaderId;
+this.voters = voters;
+this.observers = observers;
+}
+
+public Integer leaderId() {
+return leaderId;
+}
+
+public List voters() {
+return voters;
+}
+
+public List observers() {
+return observers;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+QuorumInfo that = (QuorumInfo) o;
+return leaderId.equals(that.leaderId)
+&& voters.equals(that.voters)
+&& observers.equals(that.observers);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(leaderId, voters, observers);
+}
+
+@Override
+public String toString() {
+return "QuorumInfo(" +
+"leaderId=" + leaderId +
+", voters=" + voters.toString() +
+", observers=" + observers.toString() +
+')';
+}
+
+public static class ReplicaState {
+private final int replicaId;
+private final long logEndOffset;
+private final OptionalLong lastFetchTimeMs;
+private final OptionalLong lastCaughtUpTimeMs;
+
+ReplicaState() {
+this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+}
+
+ReplicaState(
+int replicaId,
+long logEndOffset,
+OptionalLong lastFetchTimeMs,
+OptionalLong lastCaughtUpTimeMs

Review Comment:
   nit: We usually use 4 spaces indentation here.



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -778,4 +778,44 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format
+  cluster.startup
+  for (i <- 0 to 3) {
+TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+  "Broker Never started up")
+  }
+  val props = cluster.clientProperties()
+  props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+  val admin = Admin.create(props)
+  try {
+val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+val quorumInfo = quorumState.quorumInfo.get()
+
+assertEquals(0, quorumInfo.observers.size())
+assertEquals(3, quorumInfo.voters.size())

Review Comment:
   nit: We could omit parenthesis after `size`. The same for the previous line.



##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-val request = new DescribeQuorumRequest.Builder(
-  singletonRequest(KafkaRaftServer.MetadataPartition)
-).build()
-
-val response = connectAndReceive[DescribeQuorumResponse](request)
-
-assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-assertEquals(1, response.data.topics.size)
-
-val topicData = response.data.topics.get(0)
-assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-assertEquals(1, topicData.partitions.size)
-
-val partitionData = topicD