artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1446639923


##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+    image: MetadataImage,
+    topicName: String,
+    listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+    Option(image.topics().getTopic(topicName)) match {
+      case None => None
+      case Some(topic) => {
+        val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+          val partitionId = entry.getKey
+          val partition = entry.getValue
+          val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+            listenerName, false)
+          val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+            false)
+          val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+          val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+          maybeLeader match {
+            case None =>
+              val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
+                Errors.LEADER_NOT_AVAILABLE
+              } else {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
+                  s"not found on leader ${partition.leader}")
+                Errors.LISTENER_NOT_FOUND
+              }
+              new DescribeTopicPartitionsResponsePartition()
+                .setErrorCode(error.code)
+                .setPartitionIndex(partitionId)
+                .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+            case Some(leader) =>
+              val error = if (filteredReplicas.size < 
partition.replicas.length) {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
+                  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
+                Errors.REPLICA_NOT_AVAILABLE

Review Comment:
   Not quite sure why this is an error.  We should just describe the state -- 
assigned replicas, active replicas, replicas in ISR, replicas in ELR, etc.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
       ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+    metadataCache match {
+      case _: ZkMetadataCache =>
+        throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+      case _ =>
+    }
+    val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+    val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+    var topics = scala.collection.mutable.Set[String]()
+    describeTopicPartitionsRequest.topics().forEach(topic => 
topics.add(topic.name()))
+
+    val cursor = describeTopicPartitionsRequest.cursor()
+    val fetchAllTopics = topics.isEmpty
+    if (fetchAllTopics) {
+      metadataCache.getAllTopics().foreach(topic => topics.add(topic))

Review Comment:
   Looks like we copy topics multiple times.  I think we can (a) filter out 
topic that are below cursor before copying, (b) use sorted set so that we can 
build the desired data structure once.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+    image: MetadataImage,
+    topicName: String,
+    listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+    Option(image.topics().getTopic(topicName)) match {
+      case None => None
+      case Some(topic) => {
+        val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+          val partitionId = entry.getKey
+          val partition = entry.getValue
+          val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+            listenerName, false)
+          val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+            false)
+          val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+          val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+          maybeLeader match {
+            case None =>
+              val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {

Review Comment:
   I'm not sure this should be an error for the describe topics results -- the 
describe succeeded, not having a leader is on of the states that the partition 
can be in.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+    image: MetadataImage,
+    topicName: String,
+    listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+    Option(image.topics().getTopic(topicName)) match {
+      case None => None
+      case Some(topic) => {
+        val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+          val partitionId = entry.getKey
+          val partition = entry.getValue
+          val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+            listenerName, false)
+          val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+            false)
+          val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+          val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+          maybeLeader match {
+            case None =>
+              val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
+                Errors.LEADER_NOT_AVAILABLE
+              } else {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
+                  s"not found on leader ${partition.leader}")
+                Errors.LISTENER_NOT_FOUND
+              }
+              new DescribeTopicPartitionsResponsePartition()
+                .setErrorCode(error.code)
+                .setPartitionIndex(partitionId)
+                .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+            case Some(leader) =>
+              val error = if (filteredReplicas.size < 
partition.replicas.length) {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
+                  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else if (filteredIsr.size < partition.isr.length) {
+                debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
+                  s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else {
+                Errors.NONE
+              }
+
+              new DescribeTopicPartitionsResponsePartition()
+                .setErrorCode(error.code)
+                .setPartitionIndex(partitionId)
+                .setLeaderId(leader.id())
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownELR(Replicas.toList(partition.lastKnownElr))
+          }
+        }.toList)
+        partitions

Review Comment:
   Do we need `val partitions`?



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+    image: MetadataImage,
+    topicName: String,
+    listenerName: ListenerName

Review Comment:
   Should we pass startIndex, endIndex here to avoid unnecessary copies?  Then 
we can use topic.partitions.get(i) to avoid iterating over all partitions.



##########
clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 75,
+  "type": "response",
+  "name": "DescribeTopicPartitionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", 
"versions": "0+",
+      "about": "Each topic in the response.", "fields": [
+      { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        "about": "The topic error, or 0 if there was no error." },
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+        "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+        "about": "True if the topic is internal." },
+      { "name": "Partitions", "type": 
"[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+        "about": "Each partition in the topic.", "fields": [
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The partition error, or 0 if there was no error." },
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+          "about": "The ID of the leader broker." },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+          "about": "The leader epoch of this partition." },
+        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+          "about": "The set of all nodes that host this partition." },
+        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+          "about": "The set of nodes that are in sync with the leader for this 
partition." },
+        { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+          "versions": "0+", "nullableVersions": "0+",
+          "about": "The new eligible leader replicas otherwise." },
+        { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",

Review Comment:
   What's the proper casing of acronyms? We have Isr in IsrNodes, but ELR in 
LastKnownELR.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Seq[String],
+    listenerName: ListenerName,
+    firstTopicPartitionStartIndex: Int,
+    maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    var startIndex = firstTopicPartitionStartIndex
+    val result = new DescribeTopicPartitionsResponseData()
+    topics.foreach { topicName =>
+      if (remaining > 0) {
+        val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+        partitionResponse.map( partitions => {

Review Comment:
   Looks like we can use foreach instead of map -- we don't transform the 
result.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Seq[String],
+    listenerName: ListenerName,
+    firstTopicPartitionStartIndex: Int,
+    maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    var startIndex = firstTopicPartitionStartIndex
+    val result = new DescribeTopicPartitionsResponseData()
+    topics.foreach { topicName =>

Review Comment:
   Looks like this would continue iterating over the topics even after we've 
reached the limit on the result.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Seq[String],
+    listenerName: ListenerName,
+    firstTopicPartitionStartIndex: Int,
+    maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    var startIndex = firstTopicPartitionStartIndex
+    val result = new DescribeTopicPartitionsResponseData()
+    topics.foreach { topicName =>
+      if (remaining > 0) {
+        val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+        partitionResponse.map( partitions => {
+          val upperIndex = startIndex + remaining
+          val response = new DescribeTopicPartitionsResponseTopic()
+            .setErrorCode(Errors.NONE.code)
+            .setName(topicName)
+            
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+            .setIsInternal(Topic.isInternal(topicName))
+            .setPartitions(partitions.filter(partition => {
+              partition.partitionIndex() >= startIndex && 
partition.partitionIndex() < upperIndex
+            }).asJava)
+          remaining -= response.partitions().size()
+          result.topics().add(response)
+
+          if (upperIndex < partitions.size) {
+            result.setNextCursor(new Cursor()
+              .setTopicName(topicName)
+              .setPartitionIndex(upperIndex)
+            )
+            remaining = -1
+          }
+        })
+
+        // start index only applies to the first topic. Reset it here.
+        startIndex = 0
+
+        if (!partitionResponse.isDefined) {
+          val error = try {
+            Topic.validate(topicName)
+            Errors.UNKNOWN_TOPIC_OR_PARTITION

Review Comment:
   For the case when we just describe all topics we can get an unexpected error 
because a topic that was in a cursor got deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to