urbandan commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r563667979



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("hostname:port,...,hostname:port")
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
                            .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                           .requiredUnless("broker-list")
+                           .withRequiredArg
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                           .ofType(classOf[String])
+    val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+                                            " The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+                           .withRequiredArg
+                           .describedAs("topic:partition,...,topic:partition")
+                           .ofType(classOf[String])
+                           .defaultsTo("")

Review comment:
       Not at all, thank you for catching this

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("hostname:port,...,hostname:port")
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
                            .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                           .requiredUnless("broker-list")
+                           .withRequiredArg
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                           .ofType(classOf[String])
+    val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+                                            " The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+                           .withRequiredArg
+                           .describedAs("topic:partition,...,topic:partition")
+                           .ofType(classOf[String])
+                           .defaultsTo("")
+    val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Ignored if $topicPartitionOpt is present.")

Review comment:
       Thanks, fixed it

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -51,28 +64,30 @@ object GetOffsetShell {
                            .describedAs("timestamp/-1(latest)/-2(earliest)")
                            .ofType(classOf[java.lang.Long])
                            .defaultsTo(-1L)
-    parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of 
time each fetch request waits.")
+    val commandConfigOpt = parser.accepts("command-config", s"Consumer config 
properties file.")
                            .withRequiredArg
-                           .describedAs("ms")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1000)
+                           .describedAs("config file")
+                           .ofType(classOf[String])
+    val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
-   if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic offsets.")
+    if (args.length == 0)

Review comment:
       I agree, good catch

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))

Review comment:
       Thanks for the catch

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt

Review comment:
       You are right, should be handled, fixed it

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt
+      partition: Int => partition == number
+    }
   }
 
+  /**
+   * Creates a topic-partition filter based on a topic pattern and a set of 
partition ids.
+   */
+  private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: 
Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): 
Option[PartitionInfo => Boolean] = {
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = IncludeList(topic)
+        if(partitionIds.isEmpty)
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics))
+        else
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics) && partitionIds.contains(t.partition))
+      case None =>
+        if(excludeInternalTopics) {
+          if(partitionIds.isEmpty)
+            Some(t => !Topic.isInternal(t.topic))
+          else
+            Some(t => !Topic.isInternal(t.topic) && 
partitionIds.contains(t.partition))
+        } else {
+          if(partitionIds.isEmpty)
+            None
+          else
+            Some(t => partitionIds.contains(t.partition))
+        }
+    }

Review comment:
       I was mostly focusing on avoiding unnecessary filtering, but you are 
right, it shouldn't be an issue, your code is much cleaner

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0

Review comment:
       Removed unnecessary sorting, no duplication

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -89,33 +104,40 @@ object GetOffsetShell {
     }
     val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
 
-    val config = new Properties
+    val topicPartitionFilter = if (options.has(topicPartitionOpt)) {
+      
Some(createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionOpt),
 excludeInternalTopics))
+    } else {
+      createTopicPartitionFilterWithTopicAndPartitionPattern(
+        if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
+        excludeInternalTopics,
+        partitionIdsRequested
+      )
+    }
+
+    val config = if (options.has(commandConfigOpt))
+      Utils.loadProps(options.valueOf(commandConfigOpt))
+    else
+      new Properties
     config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
     val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
 
-    val partitionInfos = listPartitionInfos(consumer, topic, 
partitionIdsRequested) match {
-      case None =>
-        System.err.println(s"Topic $topic does not exist")
-        Exit.exit(1)
-      case Some(p) if p.isEmpty =>
-        if (partitionIdsRequested.isEmpty)
-          System.err.println(s"Topic $topic has 0 partitions")
-        else
-          System.err.println(s"Topic $topic does not have any of the requested 
partitions ${partitionIdsRequested.mkString(",")}")
-        Exit.exit(1)
-      case Some(p) => p
-    }
+    val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
 
-    if (partitionIdsRequested.nonEmpty) {
-      (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { 
partitionId =>
-        System.err.println(s"Error: partition $partitionId does not exist")
-      }
+    if (partitionInfos.isEmpty) {
+      System.err.println(s"Could not match any topic-partitions with the 
specified filters")
+      Exit.exit(1)
     }
 
-    val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p =>
+    val topicPartitions = partitionInfos.sortWith((tp1, tp2) => {

Review comment:
       No point, good catch

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt
+      partition: Int => partition == number
+    }
   }
 
+  /**
+   * Creates a topic-partition filter based on a topic pattern and a set of 
partition ids.
+   */
+  private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: 
Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): 
Option[PartitionInfo => Boolean] = {
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = IncludeList(topic)
+        if(partitionIds.isEmpty)
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics))
+        else
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics) && partitionIds.contains(t.partition))
+      case None =>
+        if(excludeInternalTopics) {
+          if(partitionIds.isEmpty)
+            Some(t => !Topic.isInternal(t.topic))
+          else
+            Some(t => !Topic.isInternal(t.topic) && 
partitionIds.contains(t.partition))
+        } else {
+          if(partitionIds.isEmpty)
+            None
+          else
+            Some(t => partitionIds.contains(t.partition))
+        }
+    }
+  }
+
+  /**
+   * Return the partition infos. Filter them with topicPartitionFilter if 
specified.
+   */
+  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: Option[PartitionInfo => Boolean]): Seq[PartitionInfo] = {
+    val topicListUnfiltered = consumer.listTopics.asScala.values.flatMap { tp 
=> tp.asScala }

Review comment:
       I agree, fixed it

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {
+      p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount)
+      p
+    }).map(KafkaConfig.fromProps)
+
+  @Before
+  def createTopicAndConsume(): Unit = {
+    Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest")
+
+    // Send X messages to each partition of topicX
+    val producer = new KafkaProducer[String, String](props)
+    Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+      .foreach(msgCount => producer.send(new ProducerRecord[String, 
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+    producer.close()
+
+    // Consume so consumer offsets topic is created
+    val consumer = new KafkaConsumer[String, String](props)
+    consumer.subscribe(topicPattern)
+    consumer.poll(Duration.ofMillis(1000))
+    consumer.commitSync()
+    consumer.close()
+  }
+
+  @Test
+  def testNoFilterOptions(): Unit = {
+    val offsets = executeAndParse(Array())
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))

Review comment:
       Completely agree, fixed it

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {
+      p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount)
+      p
+    }).map(KafkaConfig.fromProps)
+
+  @Before
+  def createTopicAndConsume(): Unit = {
+    Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest")
+
+    // Send X messages to each partition of topicX
+    val producer = new KafkaProducer[String, String](props)
+    Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+      .foreach(msgCount => producer.send(new ProducerRecord[String, 
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+    producer.close()
+
+    // Consume so consumer offsets topic is created
+    val consumer = new KafkaConsumer[String, String](props)
+    consumer.subscribe(topicPattern)
+    consumer.poll(Duration.ofMillis(1000))
+    consumer.commitSync()
+    consumer.close()
+  }
+
+  @Test
+  def testNoFilterOptions(): Unit = {
+    val offsets = executeAndParse(Array())
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))
+    assertEquals(offsetTopicPartitionCount, 
offsets.count(isConsumerOffsetTopicPartition))
+  }
+
+  @Test
+  def testInternalExcluded(): Unit = {
+    val offsets = executeAndParse(Array("--exclude-internal-topics"))
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))

Review comment:
       I wasn't sure if I should define any expectations on the end offsets of 
the internal topic, that's why I was testing it separately.
   But I think you are right, so I refactored the code to handle all topics 
uniformly

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")

Review comment:
       Was used by the dummy consumer, removed

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {

Review comment:
       Agree, added coverage for the arguments and the filter parsing as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to