dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r558158660



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("hostname:port,...,hostname:port")
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
                            .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                           .requiredUnless("broker-list")
+                           .withRequiredArg
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                           .ofType(classOf[String])
+    val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+                                            " The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+                           .withRequiredArg
+                           .describedAs("topic:partition,...,topic:partition")
+                           .ofType(classOf[String])
+                           .defaultsTo("")

Review comment:
       Is having empty string as default necessary? It shows as `(default: )` 
in the description which looks weird.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("hostname:port,...,hostname:port")
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
                            .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                           .requiredUnless("broker-list")
+                           .withRequiredArg
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                           .ofType(classOf[String])
+    val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+                                            " The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+                           .withRequiredArg
+                           .describedAs("topic:partition,...,topic:partition")
+                           .ofType(classOf[String])
+                           .defaultsTo("")
+    val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Ignored if $topicPartitionOpt is present.")

Review comment:
       We should replace `$topicPartitionOpt` by `--topic-partitions` in order 
to remain consistent with the other descriptions. `$topicPartitionOpt` 
generates `[topic-partitions]` in the description.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -51,28 +64,30 @@ object GetOffsetShell {
                            .describedAs("timestamp/-1(latest)/-2(earliest)")
                            .ofType(classOf[java.lang.Long])
                            .defaultsTo(-1L)
-    parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of 
time each fetch request waits.")
+    val commandConfigOpt = parser.accepts("command-config", s"Consumer config 
properties file.")

Review comment:
       How about `Property file containing configs to be passed to Consumer 
Client.` to remain inline with description of that field used by other command 
line tools?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {

Review comment:
       nit: We usually structure blocks as follow:
   ```
   ruleSpecs.map { ruleSpec => 
     ...
   }
   ```
   We can avoid the parenthesis.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("hostname:port,...,hostname:port")
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
                            .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+                           .requiredUnless("broker-list")
+                           .withRequiredArg
+                           .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+                           .ofType(classOf[String])
+    val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+                                            " The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+                           .withRequiredArg
+                           .describedAs("topic:partition,...,topic:partition")
+                           .ofType(classOf[String])
+                           .defaultsTo("")
+    val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Ignored if $topicPartitionOpt is present.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val partitionOpt = parser.accepts("partitions", "comma separated list of 
partition ids. If not specified, it will find offsets for all partitions")
+    val partitionOpt = parser.accepts("partitions", s"Comma separated list of 
partition ids to get the offsets for. If not present, all partitions of the 
authorized topics are queried. Ignored if $topicPartitionOpt is present.")

Review comment:
       ditto

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -51,28 +64,30 @@ object GetOffsetShell {
                            .describedAs("timestamp/-1(latest)/-2(earliest)")
                            .ofType(classOf[java.lang.Long])
                            .defaultsTo(-1L)
-    parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of 
time each fetch request waits.")
+    val commandConfigOpt = parser.accepts("command-config", s"Consumer config 
properties file.")
                            .withRequiredArg
-                           .describedAs("ms")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1000)
+                           .describedAs("config file")
+                           .ofType(classOf[String])
+    val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
-   if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic offsets.")
+    if (args.length == 0)

Review comment:
       Should we also verify combination of arguments that are not valid? I am 
thinking about `--topic-partitions` vs `--topic` and `--partitions`. That would 
be better than silently ignoring.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt

Review comment:
       Should we handle the case where it is not a number and provide a meaning 
full error to the user?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))

Review comment:
       Let's try to remain inline with 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase.
 We should name the variable `includeList`.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {

Review comment:
       nit: A space is missing after `if`. There are couple of similar cases 
below.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0

Review comment:
       This block of code is identical to the one use at L132. Could we extract 
the predicate in a function and use it in both cases?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -89,33 +104,40 @@ object GetOffsetShell {
     }
     val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
 
-    val config = new Properties
+    val topicPartitionFilter = if (options.has(topicPartitionOpt)) {
+      
Some(createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionOpt),
 excludeInternalTopics))
+    } else {
+      createTopicPartitionFilterWithTopicAndPartitionPattern(
+        if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
+        excludeInternalTopics,
+        partitionIdsRequested
+      )
+    }
+
+    val config = if (options.has(commandConfigOpt))
+      Utils.loadProps(options.valueOf(commandConfigOpt))
+    else
+      new Properties
     config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
     val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
 
-    val partitionInfos = listPartitionInfos(consumer, topic, 
partitionIdsRequested) match {
-      case None =>
-        System.err.println(s"Topic $topic does not exist")
-        Exit.exit(1)
-      case Some(p) if p.isEmpty =>
-        if (partitionIdsRequested.isEmpty)
-          System.err.println(s"Topic $topic has 0 partitions")
-        else
-          System.err.println(s"Topic $topic does not have any of the requested 
partitions ${partitionIdsRequested.mkString(",")}")
-        Exit.exit(1)
-      case Some(p) => p
-    }
+    val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
 
-    if (partitionIdsRequested.nonEmpty) {
-      (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { 
partitionId =>
-        System.err.println(s"Error: partition $partitionId does not exist")
-      }
+    if (partitionInfos.isEmpty) {
+      System.err.println(s"Could not match any topic-partitions with the 
specified filters")
+      Exit.exit(1)
     }
 
-    val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p =>
+    val topicPartitions = partitionInfos.sortWith((tp1, tp2) => {

Review comment:
       Do we really need to sort the partitions here? What are the benefits?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt
+      partition: Int => partition == number
+    }
   }
 
+  /**
+   * Creates a topic-partition filter based on a topic pattern and a set of 
partition ids.
+   */
+  private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: 
Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): 
Option[PartitionInfo => Boolean] = {
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = IncludeList(topic)
+        if(partitionIds.isEmpty)
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics))
+        else
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics) && partitionIds.contains(t.partition))
+      case None =>
+        if(excludeInternalTopics) {
+          if(partitionIds.isEmpty)
+            Some(t => !Topic.isInternal(t.topic))
+          else
+            Some(t => !Topic.isInternal(t.topic) && 
partitionIds.contains(t.partition))
+        } else {
+          if(partitionIds.isEmpty)
+            None
+          else
+            Some(t => partitionIds.contains(t.partition))
+        }
+    }

Review comment:
       Could we simplify this block to something like the following?
   ```
   val topicFilter = IncludeList(topicOpt.getOrElse(".*"))
   t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) && 
(partitionIds.isEmpty || partitionIds.contains(t.partition))
   ```

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt
+      partition: Int => partition == number
+    }
   }
 
+  /**
+   * Creates a topic-partition filter based on a topic pattern and a set of 
partition ids.
+   */
+  private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: 
Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): 
Option[PartitionInfo => Boolean] = {
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = IncludeList(topic)
+        if(partitionIds.isEmpty)
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics))
+        else
+          Some(t => topicsFilter.isTopicAllowed(t.topic, 
excludeInternalTopics) && partitionIds.contains(t.partition))
+      case None =>
+        if(excludeInternalTopics) {
+          if(partitionIds.isEmpty)
+            Some(t => !Topic.isInternal(t.topic))
+          else
+            Some(t => !Topic.isInternal(t.topic) && 
partitionIds.contains(t.partition))
+        } else {
+          if(partitionIds.isEmpty)
+            None
+          else
+            Some(t => partitionIds.contains(t.partition))
+        }
+    }
+  }
+
+  /**
+   * Return the partition infos. Filter them with topicPartitionFilter if 
specified.
+   */
+  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: Option[PartitionInfo => Boolean]): Seq[PartitionInfo] = {
+    val topicListUnfiltered = consumer.listTopics.asScala.values.flatMap { tp 
=> tp.asScala }

Review comment:
       It might be worth doing the filtering in the `flatMap` directly. It 
would avoid having the construct the full list and to filter it afterwards. 
What do you think?

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {

Review comment:
       nit: `map(tp => {` => `map { tp =>`

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {
+      p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount)
+      p
+    }).map(KafkaConfig.fromProps)
+
+  @Before
+  def createTopicAndConsume(): Unit = {
+    Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest")
+
+    // Send X messages to each partition of topicX
+    val producer = new KafkaProducer[String, String](props)
+    Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+      .foreach(msgCount => producer.send(new ProducerRecord[String, 
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+    producer.close()
+
+    // Consume so consumer offsets topic is created
+    val consumer = new KafkaConsumer[String, String](props)
+    consumer.subscribe(topicPattern)
+    consumer.poll(Duration.ofMillis(1000))
+    consumer.commitSync()
+    consumer.close()
+  }
+
+  @Test
+  def testNoFilterOptions(): Unit = {
+    val offsets = executeAndParse(Array())
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))

Review comment:
       It would be better to use `assertEquals` as well as it outputs the 
values when the test fails. The same applies to all the test it seems.

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {
+      p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount)
+      p
+    }).map(KafkaConfig.fromProps)
+
+  @Before
+  def createTopicAndConsume(): Unit = {
+    Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest")
+
+    // Send X messages to each partition of topicX
+    val producer = new KafkaProducer[String, String](props)
+    Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+      .foreach(msgCount => producer.send(new ProducerRecord[String, 
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+    producer.close()
+
+    // Consume so consumer offsets topic is created
+    val consumer = new KafkaConsumer[String, String](props)
+    consumer.subscribe(topicPattern)
+    consumer.poll(Duration.ofMillis(1000))
+    consumer.commitSync()
+    consumer.close()
+  }
+
+  @Test
+  def testNoFilterOptions(): Unit = {
+    val offsets = executeAndParse(Array())
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))
+    assertEquals(offsetTopicPartitionCount, 
offsets.count(isConsumerOffsetTopicPartition))
+  }
+
+  @Test
+  def testInternalExcluded(): Unit = {
+    val offsets = executeAndParse(Array("--exclude-internal-topics"))
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))

Review comment:
       I am not sure to understand why you filter the `offsets` with 
`!isConsumerOffsetTopicPartition`. In this case, we expect 
`--exclude-internal-topics` to already do this, no?
   
   In general, if `--exclude-internal-topics` is not set, we should verify that 
internal topics are there as well. I wouldn't filter them out from the result 
set.

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {

Review comment:
       It would be great if we could add few tests to verify the mutually 
exclusive arguments. We should also add unit tests to verify the parsing of the 
specs. We could make the relevant package private so we can access them from 
here.

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")
+
+  override def generateConfigs: collection.Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(p => {
+      p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount)
+      p
+    }).map(KafkaConfig.fromProps)
+
+  @Before
+  def createTopicAndConsume(): Unit = {
+    Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest")
+
+    // Send X messages to each partition of topicX
+    val producer = new KafkaProducer[String, String](props)
+    Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+      .foreach(msgCount => producer.send(new ProducerRecord[String, 
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+    producer.close()
+
+    // Consume so consumer offsets topic is created
+    val consumer = new KafkaConsumer[String, String](props)
+    consumer.subscribe(topicPattern)
+    consumer.poll(Duration.ofMillis(1000))
+    consumer.commitSync()
+    consumer.close()
+  }
+
+  @Test
+  def testNoFilterOptions(): Unit = {
+    val offsets = executeAndParse(Array())
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))
+    assertEquals(offsetTopicPartitionCount, 
offsets.count(isConsumerOffsetTopicPartition))
+  }
+
+  @Test
+  def testInternalExcluded(): Unit = {
+    val offsets = executeAndParse(Array("--exclude-internal-topics"))
+    assertTrue(expectedOffsets() sameElements offsets.filter(r => 
!isConsumerOffsetTopicPartition(r)))
+    assertEquals(0, offsets.count(isConsumerOffsetTopicPartition))
+  }
+
+  @Test
+  def testTopicNameArg(): Unit = {
+    Range(1, topicCount + 1).foreach(i => {
+      val offsets = executeAndParse(Array("--topic", topicName(i)))
+      assertTrue("Offset output did not match for " + topicName(i), 
expectedOffsetsForTopic(i) sameElements offsets)
+    })
+  }
+
+  @Test
+  def testTopicPatternArg(): Unit = {
+    val offsets = executeAndParse(Array("--topic", "topic.*"))
+    assertTrue(expectedOffsets() sameElements offsets)
+  }
+
+  @Test
+  def testPartitionsArg(): Unit = {
+    val offsets = executeAndParse(Array("--partitions", "0,1"))
+    assertTrue(expectedOffsets().filter(r => r._2 <= 1) sameElements 
offsets.filter(r => !isConsumerOffsetTopicPartition(r)))

Review comment:
       nit: `r => r._2 <= 1` is quite hard to read. It might be better to 
deconstruct the record so we can name the variable `partition`.

##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,194 @@
+package kafka.tools
+
+import java.time.Duration
+import java.util.Properties
+import java.util.regex.Pattern
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+  private val topicCount = 4
+  private val offsetTopicPartitionCount = 4
+  private val topicPattern = Pattern.compile("test.*")

Review comment:
       Is `test.*` correct? It seems that topics are named `topic...`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to