artemlivshits commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1446639923
########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( + image: MetadataImage, + topicName: String, + listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { + Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { + val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, + listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, + false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { + case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { + debug(s"Error while fetching metadata for $topicName-$partitionId: leader not available") + Errors.LEADER_NOT_AVAILABLE + } else { + debug(s"Error while fetching metadata for $topicName-$partitionId: listener $listenerName " + + s"not found on leader ${partition.leader}") + Errors.LISTENER_NOT_FOUND + } + new DescribeTopicPartitionsResponsePartition() + .setErrorCode(error.code) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + case Some(leader) => + val error = if (filteredReplicas.size < partition.replicas.length) { + debug(s"Error while fetching metadata for $topicName-$partitionId: replica information not available for " + + s"following brokers ${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}") + Errors.REPLICA_NOT_AVAILABLE Review Comment: Not quite sure why this is an error. We should just describe the state -- assigned replicas, active replicas, replicas in ISR, replicas in ELR, etc. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { + metadataCache match { + case _: ZkMetadataCache => + throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => + } + val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + + val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() + var topics = scala.collection.mutable.Set[String]() + describeTopicPartitionsRequest.topics().forEach(topic => topics.add(topic.name())) + + val cursor = describeTopicPartitionsRequest.cursor() + val fetchAllTopics = topics.isEmpty + if (fetchAllTopics) { + metadataCache.getAllTopics().foreach(topic => topics.add(topic)) Review Comment: Looks like we copy topics multiple times. I think we can (a) filter out topic that are below cursor before copying, (b) use sorted set so that we can build the desired data structure once. ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( + image: MetadataImage, + topicName: String, + listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { + Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { + val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, + listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, + false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { + case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { Review Comment: I'm not sure this should be an error for the describe topics results -- the describe succeeded, not having a leader is on of the states that the partition can be in. ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( + image: MetadataImage, + topicName: String, + listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { + Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { + val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, + listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, + false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { + case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { + debug(s"Error while fetching metadata for $topicName-$partitionId: leader not available") + Errors.LEADER_NOT_AVAILABLE + } else { + debug(s"Error while fetching metadata for $topicName-$partitionId: listener $listenerName " + + s"not found on leader ${partition.leader}") + Errors.LISTENER_NOT_FOUND + } + new DescribeTopicPartitionsResponsePartition() + .setErrorCode(error.code) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + case Some(leader) => + val error = if (filteredReplicas.size < partition.replicas.length) { + debug(s"Error while fetching metadata for $topicName-$partitionId: replica information not available for " + + s"following brokers ${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}") + Errors.REPLICA_NOT_AVAILABLE + } else if (filteredIsr.size < partition.isr.length) { + debug(s"Error while fetching metadata for $topicName-$partitionId: in sync replica information not available for " + + s"following brokers ${partition.isr.filterNot(filteredIsr.contains).mkString(",")}") + Errors.REPLICA_NOT_AVAILABLE + } else { + Errors.NONE + } + + new DescribeTopicPartitionsResponsePartition() + .setErrorCode(error.code) + .setPartitionIndex(partitionId) + .setLeaderId(leader.id()) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) + .setLastKnownELR(Replicas.toList(partition.lastKnownElr)) + } + }.toList) + partitions Review Comment: Do we need `val partitions`? ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( + image: MetadataImage, + topicName: String, + listenerName: ListenerName Review Comment: Should we pass startIndex, endIndex here to avoid unnecessary copies? Then we can use topic.partitions.get(i) to avoid iterating over all partitions. ########## clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json: ########## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 75, + "type": "response", + "name": "DescribeTopicPartitionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The topic error, or 0 if there was no error." }, + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+", + "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." }, + { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, + "about": "True if the topic is internal." }, + { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+", + "about": "Each partition in the topic.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error, or 0 if there was no error." }, + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the leader broker." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, + "about": "The leader epoch of this partition." }, + { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of all nodes that host this partition." }, + { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of nodes that are in sync with the leader for this partition." }, + { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The new eligible leader replicas otherwise." }, + { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", Review Comment: What's the proper casing of acronyms? We have Isr in IsrNodes, but ELR in LastKnownELR. ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topics The set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( + topics: Seq[String], + listenerName: ListenerName, + firstTopicPartitionStartIndex: Int, + maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { + val image = _currentImage + var remaining = maximumNumberOfPartitions + var startIndex = firstTopicPartitionStartIndex + val result = new DescribeTopicPartitionsResponseData() + topics.foreach { topicName => + if (remaining > 0) { + val partitionResponse = getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName) + partitionResponse.map( partitions => { Review Comment: Looks like we can use foreach instead of map -- we don't transform the result. ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topics The set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( + topics: Seq[String], + listenerName: ListenerName, + firstTopicPartitionStartIndex: Int, + maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { + val image = _currentImage + var remaining = maximumNumberOfPartitions + var startIndex = firstTopicPartitionStartIndex + val result = new DescribeTopicPartitionsResponseData() + topics.foreach { topicName => Review Comment: Looks like this would continue iterating over the topics even after we've reached the limit on the result. ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topics The set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( + topics: Seq[String], + listenerName: ListenerName, + firstTopicPartitionStartIndex: Int, + maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { + val image = _currentImage + var remaining = maximumNumberOfPartitions + var startIndex = firstTopicPartitionStartIndex + val result = new DescribeTopicPartitionsResponseData() + topics.foreach { topicName => + if (remaining > 0) { + val partitionResponse = getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName) + partitionResponse.map( partitions => { + val upperIndex = startIndex + remaining + val response = new DescribeTopicPartitionsResponseTopic() + .setErrorCode(Errors.NONE.code) + .setName(topicName) + .setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID)) + .setIsInternal(Topic.isInternal(topicName)) + .setPartitions(partitions.filter(partition => { + partition.partitionIndex() >= startIndex && partition.partitionIndex() < upperIndex + }).asJava) + remaining -= response.partitions().size() + result.topics().add(response) + + if (upperIndex < partitions.size) { + result.setNextCursor(new Cursor() + .setTopicName(topicName) + .setPartitionIndex(upperIndex) + ) + remaining = -1 + } + }) + + // start index only applies to the first topic. Reset it here. + startIndex = 0 + + if (!partitionResponse.isDefined) { + val error = try { + Topic.validate(topicName) + Errors.UNKNOWN_TOPIC_OR_PARTITION Review Comment: For the case when we just describe all topics we can get an unexpected error because a topic that was in a cursor got deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org