dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r664306577



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -116,7 +116,9 @@ object ApiVersion {
     // Introduce AllocateProducerIds (KIP-730)
     KAFKA_3_0_IV0,
     // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
-    KAFKA_3_0_IV1
+    KAFKA_3_0_IV1,
+    // Introduced topic IDs to MetadataRequest

Review comment:
       You are right, we no longer need to check UNSUPPORTED_VERSION_ERROR, so 
it's unnecessary to bump IBP.

##########
File path: clients/src/main/resources/common/message/MetadataResponse.json
##########
@@ -65,7 +66,7 @@
       "about": "Each topic in the response.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The topic error, or 0 if there was no error." },
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName",
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "10+",

Review comment:
       Yes, this should be 12+

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##########
@@ -65,6 +65,20 @@ public Builder(List<String> topics, boolean 
allowAutoTopicCreation) {
             this(topics, allowAutoTopicCreation, 
ApiKeys.METADATA.oldestVersion(),  ApiKeys.METADATA.latestVersion());
         }
 
+        public Builder(List<Uuid> topicIds) {
+            super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(), 
ApiKeys.METADATA.latestVersion());
+            MetadataRequestData data = new MetadataRequestData();
+            if (topicIds == null)
+                data.setTopics(null);
+            else {
+                topicIds.forEach(topicId -> data.topics().add(new 
MetadataRequestTopic().setTopicId(topicId).setName("")));

Review comment:
       This is old code since we do not support null names firstly, now we can 
remove this useless code.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
##########
@@ -32,28 +34,87 @@
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsResult {
-    private final Map<String, KafkaFuture<TopicDescription>> futures;
+    private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
+    private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
 
-    protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> 
futures) {
-        this.futures = futures;
+    protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> 
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
+        if (topicIdFutures == null && nameFutures == null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
+    }
+
+    static DescribeTopicsResult ofTopicIds(Map<Uuid, 
KafkaFuture<TopicDescription>> topicIdFutures) {
+        return new DescribeTopicsResult(topicIdFutures, null);
+    }
+
+    static DescribeTopicsResult ofTopicNames(Map<String, 
KafkaFuture<TopicDescription>> nameFutures) {
+        return new DescribeTopicsResult(null, nameFutures);
+    }
+
+    /**
+     * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicIdCollection
+     *
+     * @return a map from topic IDs to futures which can be used to check the 
status of
+     *         individual topics if the request used topic IDs, otherwise 
return null.
+     */
+    public Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues() {
+        return topicIdFutures;
     }
 
+    /**
+     * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicNameCollection
+     *
+     * @return a map from topic names to futures which can be used to check 
the status of
+     *         individual topics if the request used topic names, otherwise 
return null.
+     */
+    public Map<String, KafkaFuture<TopicDescription>> topicNameValues() {
+        return nameFutures;
+    }
+
+
     /**
      * Return a map from topic names to futures which can be used to check the 
status of
      * individual topics.
      */
+    @Deprecated
     public Map<String, KafkaFuture<TopicDescription>> values() {
-        return futures;
+        return nameFutures;
     }
 
     /**
      * Return a future which succeeds only if all the topic descriptions 
succeed.
      */
     public KafkaFuture<Map<String, TopicDescription>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+        return all(nameFutures);

Review comment:
       Done, and also change all usages to use `allTopicNames()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to