[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-01-27 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r565088240



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -33,49 +34,65 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
 val parser = new OptionParser(false)
-val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
-   .describedAs("hostname:port,...,hostname:port")
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
-val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+   .requiredUnless("broker-list")
+   .withRequiredArg
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+   .ofType(classOf[String])
+val topicPartitionsOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+" The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+   .withRequiredArg
+   .describedAs("topic:partition,...,topic:partition")
+   .ofType(classOf[String])
+val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
-val partitionOpt = parser.accepts("partitions", "comma separated list of 
partition ids. If not specified, it will find offsets for all partitions")
+val partitionsOpt = parser.accepts("partitions", s"Comma separated list of 
partition ids to get the offsets for. If not present, all partitions of the 
authorized topics are queried. Cannot be used if --topic-partitions is 
present.")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
-   .defaultsTo("")
-val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
commited record timestamp is given.]")
+val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
committed record timestamp is given.]")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
-parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-   .withRequiredArg
-   .describedAs("count")
-   .ofType(classOf[java.lang.Integer])
-   .defaultsTo(1)
-parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of 
time each fetch request waits.")
+val commandConfigOpt = parser.accepts("command-config", s"Property file 
containing configs to be passed to Consumer Client.")
.withRequiredArg
-   .describedAs("ms")
-   .ofType(classOf[java.lang.Integer])
-   .defaultsTo(1000)
+   .describedAs("config file")
+   .ofType(classOf[String])
+val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
-   if (args.length == 0)
-  CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic offsets.")
+if (args.length == 0)
+  CommandLineUtils.printUsageAndDie(p

[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-02-08 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r572047945



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -20,102 +20,131 @@ package kafka.tools
 
 import java.util.Properties
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import java.util.regex.Pattern
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
+import scala.math.Ordering.Implicits.infixOrderingOps
 
 object GetOffsetShell {
+  private val topicPartitionPattern = Pattern.compile( 
"([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?")
 
   def main(args: Array[String]): Unit = {
+try {
+  fetchOffsets(args)
+} catch {
+  case e: Exception => Exit.exit(1, Some(e.getMessage))

Review comment:
   It seems that we need to print out something here for the user. The 
default exit procedure doesn't do anything with the provided message. Take a 
look at the other commands to see how we handle this.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -132,23 +161,85 @@ object GetOffsetShell {
 }
 }
 
-partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-  println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+partitionOffsets.toSeq.sortWith((tp1, tp2) => 
compareTopicPartitions(tp1._1, tp2._1)).foreach {
+  case (tp, offset) => 
println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
 }
+  }
 
+  def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = {
+(a.topic(), a.partition()) < (b.topic(), b.partition())
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-if (partitionInfos.isEmpty)
-  None
-else if (partitionIds.isEmpty)
-  Some(partitionInfos)
-else
-  Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  def createTopicPartitionFilterWithPatternList(topicPartitions: String, 
excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+val ruleSpecs = topicPartitions.split(",")
+val rules = ruleSpecs.map { ruleSpec => parseRuleSpec(ruleSpec, 
excludeInternalTopics) }
+tp => rules.exists(rule => rule.apply(tp))
   }
 
+  def parseRuleSpec(ruleSpec: String, excludeInternalTopics: Boolean): 
PartitionInfo => Boolean = {
+def wrapNullOrEmpty(s: String): Option[String] = {
+  if (s == null || s.isEmpty)
+None
+  else
+Some(s)
+}
+
+val matcher = topicPartitionPattern.matcher(ruleSpec)
+if (!matcher.matches() || matcher.groupCount() == 0)

Review comment:
   I don't think that `matcher.groupCount() == 0` is necessary. My 
understanding is that it will never be `0` as we have groups defined in the 
regex.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -20,102 +20,131 @@ package kafka.tools
 
 import java.util.Properties
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import java.util.regex.Pattern
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
+import scala.math.Ordering.Implicits.infixOrderingOps
 
 object GetOffsetShell {
+  private val topicPartitionPattern = Pattern.compile( 
"([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?")

Review comment:
   nit: We usually capitalize the first letter of constant in Scala.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -132,23 +161,85 @@ object GetOffsetShell {
 }
  

[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-02-09 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r573192881



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -20,102 +20,120 @@ package kafka.tools
 
 import java.util.Properties
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import java.util.regex.Pattern
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
+import scala.math.Ordering.Implicits.infixOrderingOps
 
 object GetOffsetShell {
+  private val TopicPartitionPattern = Pattern.compile( 
"([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?")
 
   def main(args: Array[String]): Unit = {
+try {
+  fetchOffsets(args)
+} catch {
+  case e: Exception =>
+System.err.println(s"Error occurred: $e.getMessage")

Review comment:
   We could use `println` here to be consistent with the other command line 
tools. Moreover, you need to use curly braces around `e.getMessage`: s"Error: 
${e.getMessage}".

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -132,23 +150,79 @@ object GetOffsetShell {
 }
 }
 
-partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-  println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+partitionOffsets.toSeq.sortWith((tp1, tp2) => 
compareTopicPartitions(tp1._1, tp2._1)).foreach {
+  case (tp, offset) => 
println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
 }
+  }
 
+  def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = {
+(a.topic(), a.partition()) < (b.topic(), b.partition())
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-if (partitionInfos.isEmpty)
-  None
-else if (partitionIds.isEmpty)
-  Some(partitionInfos)
+  def createTopicPartitionFilterWithPatternList(topicPartitions: String, 
excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+val ruleSpecs = topicPartitions.split(",")
+val rules = ruleSpecs.map { ruleSpec => parseRuleSpec(ruleSpec, 
excludeInternalTopics) }

Review comment:
   `.map { ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics) }` => 
`.map(ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics))` - We usually 
don't use curly braces when the lambda is on one line.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -20,102 +20,120 @@ package kafka.tools
 
 import java.util.Properties
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import java.util.regex.Pattern
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
+import scala.math.Ordering.Implicits.infixOrderingOps
 
 object GetOffsetShell {
+  private val TopicPartitionPattern = Pattern.compile( 
"([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?")

Review comment:
   nit: The space before `"` could be removed.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -132,23 +161,85 @@ object GetOffsetShell {
 }
 }
 
-partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-  println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+partitionOffsets.toSeq.sortWith((tp1, tp2) => 
compareTopicPartitions(tp1._1, tp2._1)).foreach {

Review comment:
   No, that's fine as it is then. I missed this.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##

[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-02-09 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r573201470



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -20,102 +20,131 @@ package kafka.tools
 
 import java.util.Properties
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.utils.Utils
 
+import java.util.regex.Pattern
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
+import scala.math.Ordering.Implicits.infixOrderingOps
 
 object GetOffsetShell {
+  private val topicPartitionPattern = Pattern.compile( 
"([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?")
 
   def main(args: Array[String]): Unit = {
+try {
+  fetchOffsets(args)
+} catch {
+  case e: Exception => Exit.exit(1, Some(e.getMessage))
+}
+  }
+
+  private def fetchOffsets(args: Array[String]): Unit = {
 val parser = new OptionParser(false)
-val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
-   .describedAs("hostname:port,...,hostname:port")
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
-val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+   .requiredUnless("broker-list")
+   .withRequiredArg
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+   .ofType(classOf[String])
+val topicPartitionsOpt = parser.accepts("topic-partitions", s"Comma 
separated list of topic-partition patterns to get the offsets for, with the 
format of '$topicPartitionPattern'." +
+" The first group is an optional 
regex for the topic name, if omitted, it matches any topic name." +
+" The section after ':' describes 
a 'partition' pattern, which can be: a number, a range in the format of 
'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in 
the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or 
may be omitted to accept all partitions.")
+   .withRequiredArg
+   
.describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
+   .ofType(classOf[String])
+val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
-val partitionOpt = parser.accepts("partitions", "comma separated list of 
partition ids. If not specified, it will find offsets for all partitions")
+val partitionsOpt = parser.accepts("partitions", s"Comma separated list of 
partition ids to get the offsets for. If not present, all partitions of the 
authorized topics are queried. Cannot be used if --topic-partitions is 
present.")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
-   .defaultsTo("")
-val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
commited record timestamp is given.]")
+val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
committed record timestamp is given.]")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
-parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-   .withRequiredArg
-   .describedAs("count")
- 

[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-01-15 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r558158660



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
 val parser = new OptionParser(false)
-val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
-   .describedAs("hostname:port,...,hostname:port")
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
-val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+   .requiredUnless("broker-list")
+   .withRequiredArg
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+   .ofType(classOf[String])
+val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+" The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+   .withRequiredArg
+   .describedAs("topic:partition,...,topic:partition")
+   .ofType(classOf[String])
+   .defaultsTo("")

Review comment:
   Is having empty string as default necessary? It shows as `(default: )` 
in the description which looks weird.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -33,15 +35,26 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
 val parser = new OptionParser(false)
-val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
-   .describedAs("hostname:port,...,hostname:port")
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
-val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+   .requiredUnless("broker-list")
+   .withRequiredArg
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+   .ofType(classOf[String])
+val topicPartitionOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+" The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+   .withRequiredArg
+   .describedAs("topic:partition,...,topic:partition")
+   .ofType(classOf[String])
+   .defaultsTo("")
+val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Ignored if $topicPartitionOpt is present.")

Review comment:
   We should replace `$topicPartitionOpt` by `--topic-partitions` in order 
to remain consistent with the other descriptions. `$topicPartitionOpt` 
generates `[topic-partitions]` in the description.

##
File path: core/src/main/scala/kaf