hachikuji commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r885096122
########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.asInstanceOf[Short]) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) Review Comment: nit: these are all misaligned ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.asInstanceOf[Short]) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala + if (version == 0) { + voterData.foreach( state => { Review Comment: nit: the idiomatic way to write this is ```scala voterData.foreach { state => ... } ``` ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -4846,6 +4878,35 @@ public void testDescribeFeaturesFailure() { } } + @Test + public void testDescribeMetadataQuorumSuccess() throws Exception { + try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, + ApiKeys.DESCRIBE_QUORUM.oldestVersion(), + ApiKeys.DESCRIBE_QUORUM.latestVersion())); + env.kafkaClient().prepareResponse( + body -> body instanceof DescribeQuorumRequest, + prepareDescribeQuorumResponse(Errors.NONE)); + final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo(); + final QuorumInfo quorumInfo = future.get(); + assertEquals(defaultQuorumInfo(), quorumInfo); + } + } + + @Test + public void testDescribeMetadataQuorumFailure() { Review Comment: Could we have a test case with a partition-level error? ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -26,16 +25,19 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith +import org.slf4j.LoggerFactory import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag +@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(clusterType = Type.KRAFT) @Tag("integration") class DescribeQuorumRequestTest(cluster: ClusterInstance) { + val log = LoggerFactory.getLogger(classOf[DescribeQuorumRequestTest]) Review Comment: Seems unused? ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) { + val request = new DescribeQuorumRequest.Builder( + singletonRequest(KafkaRaftServer.MetadataPartition) + ).build(version.asInstanceOf[Short]) + val response = connectAndReceive[DescribeQuorumResponse](request) + + assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) + assertEquals(1, response.data.topics.size) + + val topicData = response.data.topics.get(0) + assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) + assertEquals(1, topicData.partitions.size) + + val partitionData = topicData.partitions.get(0) + assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) + assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) + assertTrue(partitionData.leaderEpoch > 0) + + val leaderId = partitionData.leaderId + assertTrue(leaderId > 0) + + val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) + .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) + assertTrue(leaderState.logEndOffset > 0) + + val voterData = partitionData.currentVoters().asScala + val observerData = partitionData.observers().asScala + if (version == 0) { + voterData.foreach( state => { + assertTrue(state.lastFetchTimestamp() == -1) Review Comment: nit: we should use `assertEquals`. The advantage is that we can see what the actual value was in the failure message, which is sometimes useful to understand the failure. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { + Integer partition = 0; + String topicName = response.getTopicNameByIndex(0); + Integer leaderId = response.getPartitionLeaderId(topicName, partition); + List<QuorumInfo.ReplicaState> voters = new ArrayList<>(); + List<QuorumInfo.ReplicaState> observers = new ArrayList<>(); + response.getVoterInfo(topicName, partition).forEach(v -> { + voters.add(new QuorumInfo.ReplicaState(v.replicaId(), + v.logEndOffset(), + OptionalLong.of(v.lastFetchTimestamp()), + OptionalLong.of(v.lastCaughtUpTimestamp()))); + }); + response.getObserverInfo(topicName, partition).forEach(o -> { + observers.add(new QuorumInfo.ReplicaState(o.replicaId(), + o.logEndOffset(), + OptionalLong.of(o.lastFetchTimestamp()), + OptionalLong.of(o.lastCaughtUpTimestamp()))); + }); + QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers); + return info; + } + + @Override + DescribeQuorumRequest.Builder createRequest(int timeoutMs) { Review Comment: nit: it doesn't look like `timeoutMs` is used? Also, could we use `DescribeQuorumRequest.singletonRequest`? ########## core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala: ########## @@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { @ClusterTest def testDescribeQuorum(): Unit = { - val request = new DescribeQuorumRequest.Builder( - singletonRequest(KafkaRaftServer.MetadataPartition) - ).build() - - val response = connectAndReceive[DescribeQuorumResponse](request) - - assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) - assertEquals(1, response.data.topics.size) - - val topicData = response.data.topics.get(0) - assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName) - assertEquals(1, topicData.partitions.size) - - val partitionData = topicData.partitions.get(0) - assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex) - assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode)) - assertTrue(partitionData.leaderEpoch > 0) - - val leaderId = partitionData.leaderId - assertTrue(leaderId > 0) - - val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) - .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) - assertTrue(leaderState.logEndOffset > 0) + for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) { Review Comment: How about this? ```scala for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) { ``` ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } + @Override + public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { + NodeProvider provider = new LeastLoadedNodeProvider(); + + final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { + Integer partition = 0; + String topicName = response.getTopicNameByIndex(partition); + Integer leaderId = response.getPartitionLeaderId(topicName, partition); Review Comment: @niket-goel any comment here? I think this part still reads awkward. Converting to an intermediate map is conventional. An alternative would be to do a quick validation of the response. We can structure the checks like this: 1. Check top-level error code 2. Verify only one topic in the response which matches metadata topic 3. Verify only one partition in the response with id 0 4. Check partition-level error code. This is similar to how we handle the request in `KafkaRaftClient.handleDescribeQuorumRequest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org