rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562233235



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
##########
@@ -65,6 +65,19 @@ public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo>
         this(name, internal, partitions, Collections.emptySet());
     }
 
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param name The topic name
+     * @param internal Whether the topic is internal to Kafka
+     * @param partitions A list of partitions where the index represents the 
partition id and the element contains
+     *                   leadership and replica information for that partition.
+     * @param topicId the topic id
+     */
+    public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo> partitions, Uuid topicId) {

Review comment:
       Do we really need this constructor in the public class? We could just 
use the one below?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
         return new DescribeTopicsResult(new HashMap<>(topicFutures));
     }
 
+    @Override
+    public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid> 
topicIds, DescribeTopicsOptions options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                        
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                        .setAllowAutoTopicCreation(false)
+                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.cluster();
+                Map<Uuid, Errors> errors = response.errorsByTopicId();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicId);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
+                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+                                partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+                                Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.add(topicPartitionInfo);
+                    }
+                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                    TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
+                            
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
topicId);
+                    future.complete(topicDescription);
+                }
+            }
+
+            private Node leader(PartitionInfo partitionInfo) {

Review comment:
       This could be shared with `describeTopics` as well?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= 
KAFKA_2_8_IV1) topicIds else Set.empty[Uuid]

Review comment:
       @jolshan Shouldn't we use `KAFKA_2_8_IV1` for `KafkaConfig.usesTopicId` 
and use that here?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -635,6 +650,11 @@ object TopicCommand extends Logging {
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])
+    private val topicIdOpt = parser.accepts("topic-id", "The topic-id to 
describe." +
+      "This is used only with --bootstrap-server option for manage topic.")

Review comment:
       for manage topic => `for describing topics`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##########
@@ -91,12 +91,31 @@ public int throttleTimeMs() {
     public Map<String, Errors> errors() {
         Map<String, Errors> errors = new HashMap<>();
         for (MetadataResponseTopic metadata : data.topics()) {
+            if (metadata.name() == null) {
+                throw new NullPointerException("Use errorsByTopicId() when 
manage topic using topic id");

Review comment:
       We should never get here through the public APIs right? We could use 
IllegalStateException.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -350,7 +351,8 @@
     POSITION_OUT_OF_RANGE(
         99,
         "Requested position is not greater than or equal to zero, and less 
than the size of the snapshot.",
-        PositionOutOfRangeException::new);
+        PositionOutOfRangeException::new),
+    UNKNOWN_TOPIC_ID(100, "The topic ID dose not exist", 
UnknownTopicIdException::new);

Review comment:
       typo: `does`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
##########
@@ -79,6 +92,16 @@ public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo>
         this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID);
     }
 
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param name The topic name
+     * @param internal Whether the topic is internal to Kafka
+     * @param partitions A list of partitions where the index represents the 
partition id and the element contains
+     *                   leadership and replica information for that partition.
+     * @param authorizedOperations authorized operations for this topic, or 
null if this is not known.

Review comment:
       Looks like default is empty set rather than null?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##########
@@ -91,12 +91,31 @@ public int throttleTimeMs() {
     public Map<String, Errors> errors() {
         Map<String, Errors> errors = new HashMap<>();
         for (MetadataResponseTopic metadata : data.topics()) {
+            if (metadata.name() == null) {
+                throw new NullPointerException("Use errorsByTopicId() when 
manage topic using topic id");
+            }
             if (metadata.errorCode() != Errors.NONE.code())
                 errors.put(metadata.name(), 
Errors.forCode(metadata.errorCode()));
         }
         return errors;
     }
 
+    /**
+     * Get a map of the topicIds which had metadata errors
+     * @return the map
+     */
+    public Map<Uuid, Errors> errorsByTopicId() {
+        Map<Uuid, Errors> errors = new HashMap<>();
+        for (MetadataResponseTopic metadata : data.topics()) {
+            if (metadata.topicId() == Uuid.ZERO_UUID) {
+                throw new NullPointerException("Use errors() when manage topic 
using topic name");

Review comment:
       As before, IllegalStateException?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -313,42 +313,57 @@ object TopicCommand extends Logging {
     }
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      val topicId = opts.topicId.map(Uuid.fromString).filter(_ != 
Uuid.ZERO_UUID)
+      // if topicId is provided and not zero, will use topicId regardless of 
topic name

Review comment:
       we would fail if both are provided?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnknownTopicIdException extends ApiException {
+    private static final long serialVersionUID = 1L;
+    public UnknownTopicIdException() {
+        super();

Review comment:
       `super()` is implied, so we can just leave the method empty?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
         return new DescribeTopicsResult(new HashMap<>(topicFutures));
     }
 
+    @Override
+    public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid> 
topicIds, DescribeTopicsOptions options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                        
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                        .setAllowAutoTopicCreation(false)
+                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.cluster();
+                Map<Uuid, Errors> errors = response.errorsByTopicId();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicId);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
+                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+                                partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+                                Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.add(topicPartitionInfo);
+                    }
+                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                    TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
+                            
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
topicId);
+                    future.complete(topicDescription);

Review comment:
       Could we move the code shared between `describeTopics` and 
`describeTopicsWithIds` to a common private method?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= 
KAFKA_2_8_IV1) topicIds else Set.empty[Uuid]
+
+    val unSupportedVersionTopicIds = topicIds.diff(supportedVersionTopicIds)

Review comment:
       `unSupported` => `unsupported` since it is one word (below as well)

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -313,42 +313,54 @@ object TopicCommand extends Logging {
     }
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      val topicId = opts.topicId.map(Uuid.fromString).filter(_ != 
Uuid.ZERO_UUID)
+      val topics = if (topicId.isEmpty)
+        getTopics(opts.topic, opts.excludeInternalTopics)
+      else
+        Seq()
 
-      if (topics.nonEmpty) {
-        val allConfigs = adminClient.describeConfigs(topics.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-        val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
-        val topicDescriptions = 
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-        val topicPartitions = topicDescriptions
-          .flatMap(td => td.partitions.iterator().asScala.map(p => new 
TopicPartition(td.name(), p.partition())))
-          .toSet.asJava
-        val reassignments = listAllReassignments(topicPartitions)
-
-        for (td <- topicDescriptions) {
-          val topicName = td.name
-          val topicId = td.topicId()
-          val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
-          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-          if (describeOptions.describeConfigs) {
-            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-            if (!opts.reportOverriddenConfigs || hasNonDefault) {
-              val numPartitions = td.partitions().size
-              val firstPartition = td.partitions.iterator.next()
-              val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
-              val topicDesc = TopicDescription(topicName, topicId, 
numPartitions, getReplicationFactor(firstPartition, reassignment), config, 
markedForDeletion = false)
-              topicDesc.printDescription()
-            }
+      if (topicId.isEmpty)
+        ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Since listTopics uses MetadataRequest, we just need to add topic ids 
from the MetadataResponse to TopicListing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to