chia7712 commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r567655022



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
##########
@@ -17,24 +17,36 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Uuid;
+
 /**
  * A listing of a topic in the cluster.
  */
 public class TopicListing {
     private final String name;
+    private final Uuid topicId;
     private final boolean internal;
 
     /**
      * Create an instance with the specified parameters.
      *
      * @param name The topic name
+     * @param topicId The topic id.
      * @param internal Whether the topic is internal to Kafka
      */
-    public TopicListing(String name, boolean internal) {
+    public TopicListing(String name, Uuid topicId, boolean internal) {
+        this.topicId = topicId;
         this.name = name;
         this.internal = internal;
     }
 
+    /**
+     * The id of the topic.
+     */
+    public Uuid topicId() {

Review comment:
       Could you add topic id to ```toString```?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1191,8 +1192,35 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val useTopicId = topicIds.nonEmpty
+
+    val (supportedVersionTopicIds, unsupportedVersionTopicIds) = if 
(config.interBrokerProtocolVersion >= KAFKA_2_8_IV1)
+      (topicIds, Set.empty[Uuid])
+    else
+      (Set.empty[Uuid], topicIds)
+
+    val unsupportedVersionTopicMetadata = if 
(unsupportedVersionTopicIds.isEmpty)
+      Seq.empty[MetadataResponseTopic]
+    else
+      unsupportedVersionTopicIds.map(topicId =>
+        metadataResponseTopic(Errors.UNSUPPORTED_VERSION, null, topicId, 
false, util.Collections.emptyList())).toSeq
+
+    // Only get topicIds and topicNames when supporting topicId
+    val unknownTopicIds = 
supportedVersionTopicIds.filter(metadataCache.getTopicName(_).isEmpty)
+    val knownTopicNames = 
supportedVersionTopicIds.flatMap(metadataCache.getTopicName)
+
+    val unknownTopicIdsTopicMetadata = if (unknownTopicIds.isEmpty)

Review comment:
       ```scala
       val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
           metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false, 
util.Collections.emptyList())).toSeq
   ```
   
   Is above code more simple?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1191,8 +1192,35 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val useTopicId = topicIds.nonEmpty
+
+    val (supportedVersionTopicIds, unsupportedVersionTopicIds) = if 
(config.interBrokerProtocolVersion >= KAFKA_2_8_IV1)
+      (topicIds, Set.empty[Uuid])
+    else
+      (Set.empty[Uuid], topicIds)
+
+    val unsupportedVersionTopicMetadata = if 
(unsupportedVersionTopicIds.isEmpty)

Review comment:
       ```scala
       val unsupportedVersionTopicMetadata = 
unsupportedVersionTopicIds.map(topicId =>
           metadataResponseTopic(Errors.UNSUPPORTED_VERSION, null, topicId, 
false, util.Collections.emptyList())).toSeq
   ```
   
   Is above code more simple?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1191,8 +1192,35 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val useTopicId = topicIds.nonEmpty
+
+    val (supportedVersionTopicIds, unsupportedVersionTopicIds) = if 
(config.interBrokerProtocolVersion >= KAFKA_2_8_IV1)
+      (topicIds, Set.empty[Uuid])
+    else
+      (Set.empty[Uuid], topicIds)
+
+    val unsupportedVersionTopicMetadata = if 
(unsupportedVersionTopicIds.isEmpty)
+      Seq.empty[MetadataResponseTopic]
+    else
+      unsupportedVersionTopicIds.map(topicId =>
+        metadataResponseTopic(Errors.UNSUPPORTED_VERSION, null, topicId, 
false, util.Collections.emptyList())).toSeq
+
+    // Only get topicIds and topicNames when supporting topicId
+    val unknownTopicIds = 
supportedVersionTopicIds.filter(metadataCache.getTopicName(_).isEmpty)
+    val knownTopicNames = 
supportedVersionTopicIds.flatMap(metadataCache.getTopicName)
+
+    val unknownTopicIdsTopicMetadata = if (unknownTopicIds.isEmpty)
+      Seq.empty[MetadataResponseTopic]
+    else
+      unknownTopicIds.map(topicId =>
+        metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false, 
util.Collections.emptyList())).toSeq
+
     val topics = if (metadataRequest.isAllTopics)
       metadataCache.getAllTopics()
+    else if (useTopicId)

Review comment:
       Should we reject the requests carrying both topic id and topic name?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to