This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab8a7ff  KAFKA-8670; Fix exception for kafka-topics.sh --describe 
without --topic mentioned (#7094)
ab8a7ff is described below

commit ab8a7ff36326742e1c6d4fc0a0aff818c7e6d313
Author: Tirtha Chatterjee <ti...@amazon.com>
AuthorDate: Thu Jul 18 13:25:25 2019 -0700

    KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic 
mentioned (#7094)
    
    If there are **no topics** in a cluster, kafka-topics.sh --describe without 
a --topic option should return empty list, not throw an exception.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/admin/TopicCommand.scala | 26 ++++++++++++----------
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |  5 +++++
 2 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 4f52996..ae3cde8 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -208,7 +208,7 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics)
+      ensureTopicExists(topics, opts.topic)
       val topicsInfo = 
adminClient.describeTopics(topics.asJavaCollection).values()
       adminClient.createPartitions(topics.map {topicName =>
         if (topic.hasReplicaAssignment) {
@@ -267,7 +267,7 @@ object TopicCommand extends Logging {
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics)
+      ensureTopicExists(topics, opts.topic)
       adminClient.deleteTopics(topics.asJavaCollection).all().get()
     }
 
@@ -317,7 +317,7 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
       val tp = new CommandTopicPartition(opts)
-      ensureTopicExists(topics, opts.ifExists)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
       val adminZkClient = new AdminZkClient(zkClient)
       topics.foreach { topic =>
         val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
@@ -354,8 +354,7 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val topicOptWithExits = opts.topic.isDefined && opts.ifExists
-      ensureTopicExists(topics, topicOptWithExits)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
       val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
       val describeOptions = new DescribeOptions(opts, liveBrokers)
       val adminZkClient = new AdminZkClient(zkClient)
@@ -401,7 +400,7 @@ object TopicCommand extends Logging {
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.ifExists)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
       topics.foreach { topic =>
         try {
           if (Topic.isInternal(topic)) {
@@ -433,14 +432,17 @@ object TopicCommand extends Logging {
   /**
     * ensures topic existence and throws exception if topic doesn't exist
     *
-    * @param opts
-    * @param topics
-    * @param topicOptWithExists
+    * @param foundTopics Topics that were found to match the requested topic 
name.
+    * @param requestedTopic Name of the topic that was requested.
+    * @param requireTopicExists Indicates if the topic needs to exist for the 
operation to be successful.
+    *                           If set to true, the command will throw an 
exception if the topic with the
+    *                           requested name does not exist.
     */
-  private def ensureTopicExists(topics: Seq[String], topicOptWithExists: 
Boolean = false) = {
-    if (topics.isEmpty && !topicOptWithExists) {
+  private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: 
Option[String], requireTopicExists: Boolean = true) = {
+    // If no topic name was mentioned, do not need to throw exception.
+    if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty) 
{
       // If given topic doesn't exist then throw exception
-      throw new IllegalArgumentException(s"Topics in [${topics.mkString(",")}] 
does not exist")
+      throw new IllegalArgumentException(s"Topic '${requestedTopic.get}' does 
not exist as expected")
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 407c2f3..35502f5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -416,6 +416,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with 
Logging with RackAwareT
       topicService.describeTopic(describeOpts)
     }
 
+    // describe all topics
+    val describeOptsAllTopics = new TopicCommandOptions(Array())
+    // should not throw any error
+    topicService.describeTopic(describeOptsAllTopics)
+
     // describe topic that does not exist with --if-exists
     val describeOptsWithExists = new TopicCommandOptions(Array("--topic", 
testTopicName, "--if-exists"))
     // should not throw any error

Reply via email to