cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814353



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = 
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, 
getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = 
false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val allConfigs = adminClient.describeConfigs(topics.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+        val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
+        val topicDescriptions = 
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+        val topicPartitions = topicDescriptions
+          .flatMap(td => td.partitions.iterator().asScala.map(p => new 
TopicPartition(td.name(), p.partition())))
+          .toSet.asJava
+        val reassignments = listAllReassignments(topicPartitions)
+
+        for (td <- topicDescriptions) {
+          val topicName = td.name
+          val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
+          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+          if (describeOptions.describeConfigs) {
+            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+            if (!opts.reportOverriddenConfigs || hasNonDefault) {
+              val numPartitions = td.partitions().size
+              val firstPartition = td.partitions.iterator.next()
+              val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
+              val topicDesc = TopicDescription(topicName, numPartitions, 
getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = 
false)
+              topicDesc.printDescription()
+            }
           }
-        }
 
-        if (describeOptions.describePartitions) {
-          for (partition <- sortedPartitions) {
-            val reassignment = reassignments.get(new TopicPartition(td.name, 
partition.partition))
-            val partitionDesc = PartitionDescription(topicName, partition, 
Some(config), markedForDeletion = false, reassignment)
-            describeOptions.maybePrintPartitionDescription(partitionDesc)
+          if (describeOptions.describePartitions) {
+            for (partition <- sortedPartitions) {
+              val reassignment = reassignments.get(new TopicPartition(td.name, 
partition.partition))
+              val partitionDesc = PartitionDescription(topicName, partition, 
Some(config), markedForDeletion = false, reassignment)
+              describeOptions.maybePrintPartitionDescription(partitionDesc)
+            }
           }
         }
       }
     }
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      adminClient.deleteTopics(topics.asJavaCollection).all().get()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      if (topics.nonEmpty)

Review comment:
       It's not necessary to check this.  AdminClient handles being passed 
empty sets of topics to delete (by doing nothing)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to