dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r558158660
########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) + val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + + " The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") Review comment: Is having empty string as default necessary? It shows as `(default: )` in the description which looks weird. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) + val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + + " The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") + val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Ignored if $topicPartitionOpt is present.") Review comment: We should replace `$topicPartitionOpt` by `--topic-partitions` in order to remain consistent with the other descriptions. `$topicPartitionOpt` generates `[topic-partitions]` in the description. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -51,28 +64,30 @@ object GetOffsetShell { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) - parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") + val commandConfigOpt = parser.accepts("command-config", s"Consumer config properties file.") Review comment: How about `Property file containing configs to be passed to Consumer Client.` to remain inline with description of that field used by other command line tools? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { Review comment: nit: We usually structure blocks as follow: ``` ruleSpecs.map { ruleSpec => ... } ``` We can avoid the parenthesis. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) + val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + + " The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") + val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Ignored if $topicPartitionOpt is present.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") + val partitionOpt = parser.accepts("partitions", s"Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Ignored if $topicPartitionOpt is present.") Review comment: ditto ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -51,28 +64,30 @@ object GetOffsetShell { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) - parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") + val commandConfigOpt = parser.accepts("command-config", s"Consumer config properties file.") .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .describedAs("config file") + .ofType(classOf[String]) + val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.") - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.") + if (args.length == 0) Review comment: Should we also verify combination of arguments that are not valid? I am thinking about `--topic-partitions` vs `--topic` and `--partitions`. That would be better than silently ignoring. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt Review comment: Should we handle the case where it is not a number and provide a meaning full error to the user? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) Review comment: Let's try to remain inline with https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase. We should name the variable `includeList`. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { Review comment: nit: A space is missing after `if`. There are couple of similar cases below. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 Review comment: This block of code is identical to the one use at L132. Could we extract the predicate in a function and use it in both cases? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -89,33 +104,40 @@ object GetOffsetShell { } val listOffsetsTimestamp = options.valueOf(timeOpt).longValue - val config = new Properties + val topicPartitionFilter = if (options.has(topicPartitionOpt)) { + Some(createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionOpt), excludeInternalTopics)) + } else { + createTopicPartitionFilterWithTopicAndPartitionPattern( + if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, + excludeInternalTopics, + partitionIdsRequested + ) + } + + val config = if (options.has(commandConfigOpt)) + Utils.loadProps(options.valueOf(commandConfigOpt)) + else + new Properties config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) - val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match { - case None => - System.err.println(s"Topic $topic does not exist") - Exit.exit(1) - case Some(p) if p.isEmpty => - if (partitionIdsRequested.isEmpty) - System.err.println(s"Topic $topic has 0 partitions") - else - System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}") - Exit.exit(1) - case Some(p) => p - } + val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) - if (partitionIdsRequested.nonEmpty) { - (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId => - System.err.println(s"Error: partition $partitionId does not exist") - } + if (partitionInfos.isEmpty) { + System.err.println(s"Could not match any topic-partitions with the specified filters") + Exit.exit(1) } - val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p => + val topicPartitions = partitionInfos.sortWith((tp1, tp2) => { Review comment: Do we really need to sort the partitions here? What are the benefits? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt + partition: Int => partition == number + } } + /** + * Creates a topic-partition filter based on a topic pattern and a set of partition ids. + */ + private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): Option[PartitionInfo => Boolean] = { + topicOpt match { + case Some(topic) => + val topicsFilter = IncludeList(topic) + if(partitionIds.isEmpty) + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) + else + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics) && partitionIds.contains(t.partition)) + case None => + if(excludeInternalTopics) { + if(partitionIds.isEmpty) + Some(t => !Topic.isInternal(t.topic)) + else + Some(t => !Topic.isInternal(t.topic) && partitionIds.contains(t.partition)) + } else { + if(partitionIds.isEmpty) + None + else + Some(t => partitionIds.contains(t.partition)) + } + } Review comment: Could we simplify this block to something like the following? ``` val topicFilter = IncludeList(topicOpt.getOrElse(".*")) t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) && (partitionIds.isEmpty || partitionIds.contains(t.partition)) ``` ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt + partition: Int => partition == number + } } + /** + * Creates a topic-partition filter based on a topic pattern and a set of partition ids. + */ + private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): Option[PartitionInfo => Boolean] = { + topicOpt match { + case Some(topic) => + val topicsFilter = IncludeList(topic) + if(partitionIds.isEmpty) + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) + else + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics) && partitionIds.contains(t.partition)) + case None => + if(excludeInternalTopics) { + if(partitionIds.isEmpty) + Some(t => !Topic.isInternal(t.topic)) + else + Some(t => !Topic.isInternal(t.topic) && partitionIds.contains(t.partition)) + } else { + if(partitionIds.isEmpty) + None + else + Some(t => partitionIds.contains(t.partition)) + } + } + } + + /** + * Return the partition infos. Filter them with topicPartitionFilter if specified. + */ + private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: Option[PartitionInfo => Boolean]): Seq[PartitionInfo] = { + val topicListUnfiltered = consumer.listTopics.asScala.values.flatMap { tp => tp.asScala } Review comment: It might be worth doing the filtering in the `flatMap` directly. It would avoid having the construct the full list and to filter it afterwards. What do you think? ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { Review comment: nit: `map(tp => {` => `map { tp =>` ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { + p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount) + p + }).map(KafkaConfig.fromProps) + + @Before + def createTopicAndConsume(): Unit = { + Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) + + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest") + + // Send X messages to each partition of topicX + val producer = new KafkaProducer[String, String](props) + Range(1, topicCount + 1).foreach(i => Range(0, i*i) + .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) + producer.close() + + // Consume so consumer offsets topic is created + val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(topicPattern) + consumer.poll(Duration.ofMillis(1000)) + consumer.commitSync() + consumer.close() + } + + @Test + def testNoFilterOptions(): Unit = { + val offsets = executeAndParse(Array()) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) Review comment: It would be better to use `assertEquals` as well as it outputs the values when the test fails. The same applies to all the test it seems. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { + p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount) + p + }).map(KafkaConfig.fromProps) + + @Before + def createTopicAndConsume(): Unit = { + Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) + + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest") + + // Send X messages to each partition of topicX + val producer = new KafkaProducer[String, String](props) + Range(1, topicCount + 1).foreach(i => Range(0, i*i) + .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) + producer.close() + + // Consume so consumer offsets topic is created + val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(topicPattern) + consumer.poll(Duration.ofMillis(1000)) + consumer.commitSync() + consumer.close() + } + + @Test + def testNoFilterOptions(): Unit = { + val offsets = executeAndParse(Array()) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) + assertEquals(offsetTopicPartitionCount, offsets.count(isConsumerOffsetTopicPartition)) + } + + @Test + def testInternalExcluded(): Unit = { + val offsets = executeAndParse(Array("--exclude-internal-topics")) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) Review comment: I am not sure to understand why you filter the `offsets` with `!isConsumerOffsetTopicPartition`. In this case, we expect `--exclude-internal-topics` to already do this, no? In general, if `--exclude-internal-topics` is not set, we should verify that internal topics are there as well. I wouldn't filter them out from the result set. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { Review comment: It would be great if we could add few tests to verify the mutually exclusive arguments. We should also add unit tests to verify the parsing of the specs. We could make the relevant package private so we can access them from here. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { + p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount) + p + }).map(KafkaConfig.fromProps) + + @Before + def createTopicAndConsume(): Unit = { + Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) + + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest") + + // Send X messages to each partition of topicX + val producer = new KafkaProducer[String, String](props) + Range(1, topicCount + 1).foreach(i => Range(0, i*i) + .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) + producer.close() + + // Consume so consumer offsets topic is created + val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(topicPattern) + consumer.poll(Duration.ofMillis(1000)) + consumer.commitSync() + consumer.close() + } + + @Test + def testNoFilterOptions(): Unit = { + val offsets = executeAndParse(Array()) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) + assertEquals(offsetTopicPartitionCount, offsets.count(isConsumerOffsetTopicPartition)) + } + + @Test + def testInternalExcluded(): Unit = { + val offsets = executeAndParse(Array("--exclude-internal-topics")) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) + assertEquals(0, offsets.count(isConsumerOffsetTopicPartition)) + } + + @Test + def testTopicNameArg(): Unit = { + Range(1, topicCount + 1).foreach(i => { + val offsets = executeAndParse(Array("--topic", topicName(i))) + assertTrue("Offset output did not match for " + topicName(i), expectedOffsetsForTopic(i) sameElements offsets) + }) + } + + @Test + def testTopicPatternArg(): Unit = { + val offsets = executeAndParse(Array("--topic", "topic.*")) + assertTrue(expectedOffsets() sameElements offsets) + } + + @Test + def testPartitionsArg(): Unit = { + val offsets = executeAndParse(Array("--partitions", "0,1")) + assertTrue(expectedOffsets().filter(r => r._2 <= 1) sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) Review comment: nit: `r => r._2 <= 1` is quite hard to read. It might be better to deconstruct the record so we can name the variable `partition`. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") Review comment: Is `test.*` correct? It seems that topics are named `topic...`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org