niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r892884885
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { + List<QuorumInfo.ReplicaState> voters = new ArrayList<>(); + List<QuorumInfo.ReplicaState> observers = new ArrayList<>(); + partition.currentVoters().forEach(v -> { + voters.add(new QuorumInfo.ReplicaState(v.replicaId(), + v.logEndOffset(), + v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()), + v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp()))); + }); + partition.observers().forEach(o -> { + observers.add(new QuorumInfo.ReplicaState(o.replicaId(), + o.logEndOffset(), + o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()), + o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp()))); Review Comment: Do we really think this is worth another method.? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { + List<QuorumInfo.ReplicaState> voters = new ArrayList<>(); + List<QuorumInfo.ReplicaState> observers = new ArrayList<>(); + partition.currentVoters().forEach(v -> { + voters.add(new QuorumInfo.ReplicaState(v.replicaId(), + v.logEndOffset(), + v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()), + v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp()))); + }); + partition.observers().forEach(o -> { + observers.add(new QuorumInfo.ReplicaState(o.replicaId(), + o.logEndOffset(), + o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()), + o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp()))); + }); + QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); + return info; + } + + @Override + DescribeQuorumRequest.Builder createRequest(int timeoutMs) { + return new Builder(DescribeQuorumRequest.singletonRequest( + new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition()))); + } + + @Override + void handleResponse(AbstractResponse response) { + final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; + if (quorumResponse.data().errorCode() != Errors.NONE.code()) { + throw Errors.forCode(quorumResponse.data().errorCode()).exception(); Review Comment: That is correct understanding. The intent (based on some discussion with Jason) is to allow the default handler to manage exceptions returned by the server. The `UnknownServerException` that we raise here will end up calling `handleFailure` and complete exceptionally. ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -778,4 +778,35 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + val admin = Admin.create(props) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo().get() + + assertEquals(0, quorumInfo.leaderId()) Review Comment: Done! ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.toShort) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala + if (version == 0) { Review Comment: I think I missed replying to this comment earlier. Apologies about that. I am not sure if we can check anything for versions greater than zero at the moment. As of this PR we are not setting the fields in the response and so we cannot verify that. Was there something you had in mind that we should verify for versions > 0? ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.toShort) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala + if (version == 0) { + voterData.foreach { state => + assertTrue(0 < state.replicaId) Review Comment: yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org