rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r555730755



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
##########
@@ -62,25 +62,25 @@ public int hashCode() {
      *                   leadership and replica information for that partition.
      */
     public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo> partitions) {
-        this(name, internal, partitions, Collections.emptySet());
+        this(name, Uuid.ZERO_UUID, internal, partitions);
+    }
+
+    public TopicDescription(String name, Uuid topicId, boolean internal, 
List<TopicPartitionInfo> partitions) {
+        this(name, topicId, internal, partitions, Collections.emptySet());
     }
 
     /**
      * Create an instance with the specified parameters.
      *
      * @param name The topic name
+     * @param topicId the topic id
      * @param internal Whether the topic is internal to Kafka
      * @param partitions A list of partitions where the index represents the 
partition id and the element contains
      *                   leadership and replica information for that partition.
      * @param authorizedOperations authorized operations for this topic, or 
null if this is not known.
      */
-    public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo> partitions,
+    public TopicDescription(String name, Uuid topicId, boolean internal, 
List<TopicPartitionInfo> partitions,

Review comment:
       We seemed to have removed a constructor?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1816,7 +1823,103 @@ void handleFailure(Throwable throwable) {
         if (!topicNamesList.isEmpty()) {
             runnable.call(call, now);
         }
-        return new DescribeTopicsResult(new HashMap<>(topicFutures));
+        return new DescribeTopicsResult<>(new HashMap<>(topicFutures));
+    }
+
+    @Override
+    public DescribeTopicsResult<Uuid> describeTopicsWithIds(Collection<Uuid> 
topicIds, DescribeTopicsOptions options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            private boolean supportsDisablingTopicCreation = true;
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                if (supportsDisablingTopicCreation)
+                    return new MetadataRequest.Builder(new 
MetadataRequestData()
+                            
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                            .setAllowAutoTopicCreation(false)
+                            
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+                else
+                    return MetadataRequest.Builder.allTopics();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.cluster();
+                Map<String, Errors> errors = response.errors();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicName);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
+                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+                                partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+                                Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.add(topicPartitionInfo);
+                    }
+                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                    TopicDescription topicDescription = new 
TopicDescription(topicName, topicId, isInternal, partitions,
+                            
validAclOperations(response.topicAuthorizedOperations(topicName).get()));
+                    future.complete(topicDescription);
+                }
+            }
+
+            private Node leader(PartitionInfo partitionInfo) {
+                if (partitionInfo.leader() == null || 
partitionInfo.leader().id() == Node.noNode().id())
+                    return null;
+                return partitionInfo.leader();
+            }
+
+            @Override
+            boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                if (supportsDisablingTopicCreation) {
+                    supportsDisablingTopicCreation = false;

Review comment:
       Couldn't unsupported version mean that topic ids are not supported 
(rather than `disablingTopicCreation`)? In any case, you can't create topics 
with topic ids right?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##########
@@ -91,7 +91,7 @@ public int throttleTimeMs() {
     public Map<String, Errors> errors() {
         Map<String, Errors> errors = new HashMap<>();
         for (MetadataResponseTopic metadata : data.topics()) {
-            if (metadata.errorCode() != Errors.NONE.code())
+            if (metadata.errorCode() != Errors.NONE.code() && metadata.name() 
!= null)

Review comment:
       What happens if `name` is null - won't we quietly ignore? Would it be 
better to throw an exception since we don't want anyone using this with topic 
ids?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -313,42 +313,54 @@ object TopicCommand extends Logging {
     }
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      val topicId = opts.topicId.map(Uuid.fromString).filter(_ != 
Uuid.ZERO_UUID)
+      val topics = if (topicId.isEmpty)
+        getTopics(opts.topic, opts.excludeInternalTopics)
+      else
+        Seq()
 
-      if (topics.nonEmpty) {
-        val allConfigs = adminClient.describeConfigs(topics.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-        val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
-        val topicDescriptions = 
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-        val topicPartitions = topicDescriptions
-          .flatMap(td => td.partitions.iterator().asScala.map(p => new 
TopicPartition(td.name(), p.partition())))
-          .toSet.asJava
-        val reassignments = listAllReassignments(topicPartitions)
-
-        for (td <- topicDescriptions) {
-          val topicName = td.name
-          val topicId = td.topicId()
-          val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
-          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-          if (describeOptions.describeConfigs) {
-            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-            if (!opts.reportOverriddenConfigs || hasNonDefault) {
-              val numPartitions = td.partitions().size
-              val firstPartition = td.partitions.iterator.next()
-              val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
-              val topicDesc = TopicDescription(topicName, topicId, 
numPartitions, getReplicationFactor(firstPartition, reassignment), config, 
markedForDeletion = false)
-              topicDesc.printDescription()
-            }
+      if (topicId.isEmpty)
+        ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       `topics` may also be empty?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
##########
@@ -31,29 +31,29 @@
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeTopicsResult {
-    private final Map<String, KafkaFuture<TopicDescription>> futures;
+public class DescribeTopicsResult<T> {

Review comment:
       We need to keep the Admin API backwards compatible. An application that 
was written using the 2.7.0 should not break if it is compiled with a 2.8.0 
clients jar. You can always add an internal class with shared code to avoid 
duplication, but the public API itself needs to remain compatible.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -206,6 +209,8 @@ synchronized public void addTopic(boolean internal,
                 logDirs.add(brokerLogDirs.get(partition.leader().id()).get(0));
             }
         }
+        Uuid topicId = Uuid.randomUuid();
+        topicIds.put(name, topicId);

Review comment:
       Do we want the option to test with and without topic ids?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -742,10 +760,16 @@ object TopicCommand extends Logging {
       if (has(bootstrapServerOpt) == has(zkConnectOpt))
         throw new IllegalArgumentException("Only one of --bootstrap-server or 
--zookeeper must be specified")
 
+      if (has(topicIdOpt) && has(zkConnectOpt)) {
+        throw new IllegalArgumentException("--topic-id can used only with 
--bootstrap-server")
+      }
+
       if (!has(bootstrapServerOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-      if(has(describeOpt) && has(ifExistsOpt))
-        CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)

Review comment:
       We have removed this check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to