[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r565088240 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -33,49 +34,65 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) -val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") +val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) -val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") +val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) +val topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + +" The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) +val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) -val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") +val partitionsOpt = parser.accepts("partitions", s"Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.") .withRequiredArg .describedAs("partition ids") .ofType(classOf[String]) - .defaultsTo("") -val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently commited record timestamp is given.]") +val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) -parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) -parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") +val commandConfigOpt = parser.accepts("command-config", s"Property file containing configs to be passed to Consumer Client.") .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .describedAs("config file") + .ofType(classOf[String]) +val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.") - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.") +if (args.length == 0) + CommandLineUtils.printUsageAndDie(p
[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r572047945 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -20,102 +20,131 @@ package kafka.tools import java.util.Properties import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.math.Ordering.Implicits.infixOrderingOps object GetOffsetShell { + private val topicPartitionPattern = Pattern.compile( "([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?") def main(args: Array[String]): Unit = { +try { + fetchOffsets(args) +} catch { + case e: Exception => Exit.exit(1, Some(e.getMessage)) Review comment: It seems that we need to print out something here for the user. The default exit procedure doesn't do anything with the provided message. Take a look at the other commands to see how we handle this. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -132,23 +161,85 @@ object GetOffsetShell { } } -partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") +partitionOffsets.toSeq.sortWith((tp1, tp2) => compareTopicPartitions(tp1._1, tp2._1)).foreach { + case (tp, offset) => println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } + } + def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = { +(a.topic(), a.partition()) < (b.topic(), b.partition()) } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { -val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer -if (partitionInfos.isEmpty) - None -else if (partitionIds.isEmpty) - Some(partitionInfos) -else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { +val ruleSpecs = topicPartitions.split(",") +val rules = ruleSpecs.map { ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics) } +tp => rules.exists(rule => rule.apply(tp)) } + def parseRuleSpec(ruleSpec: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { +def wrapNullOrEmpty(s: String): Option[String] = { + if (s == null || s.isEmpty) +None + else +Some(s) +} + +val matcher = topicPartitionPattern.matcher(ruleSpec) +if (!matcher.matches() || matcher.groupCount() == 0) Review comment: I don't think that `matcher.groupCount() == 0` is necessary. My understanding is that it will never be `0` as we have groups defined in the regex. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -20,102 +20,131 @@ package kafka.tools import java.util.Properties import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.math.Ordering.Implicits.infixOrderingOps object GetOffsetShell { + private val topicPartitionPattern = Pattern.compile( "([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?") Review comment: nit: We usually capitalize the first letter of constant in Scala. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -132,23 +161,85 @@ object GetOffsetShell { }
[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r573192881 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -20,102 +20,120 @@ package kafka.tools import java.util.Properties import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.math.Ordering.Implicits.infixOrderingOps object GetOffsetShell { + private val TopicPartitionPattern = Pattern.compile( "([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?") def main(args: Array[String]): Unit = { +try { + fetchOffsets(args) +} catch { + case e: Exception => +System.err.println(s"Error occurred: $e.getMessage") Review comment: We could use `println` here to be consistent with the other command line tools. Moreover, you need to use curly braces around `e.getMessage`: s"Error: ${e.getMessage}". ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -132,23 +150,79 @@ object GetOffsetShell { } } -partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") +partitionOffsets.toSeq.sortWith((tp1, tp2) => compareTopicPartitions(tp1._1, tp2._1)).foreach { + case (tp, offset) => println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } + } + def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = { +(a.topic(), a.partition()) < (b.topic(), b.partition()) } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { -val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer -if (partitionInfos.isEmpty) - None -else if (partitionIds.isEmpty) - Some(partitionInfos) + def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { +val ruleSpecs = topicPartitions.split(",") +val rules = ruleSpecs.map { ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics) } Review comment: `.map { ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics) }` => `.map(ruleSpec => parseRuleSpec(ruleSpec, excludeInternalTopics))` - We usually don't use curly braces when the lambda is on one line. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -20,102 +20,120 @@ package kafka.tools import java.util.Properties import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.math.Ordering.Implicits.infixOrderingOps object GetOffsetShell { + private val TopicPartitionPattern = Pattern.compile( "([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?") Review comment: nit: The space before `"` could be removed. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -132,23 +161,85 @@ object GetOffsetShell { } } -partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") +partitionOffsets.toSeq.sortWith((tp1, tp2) => compareTopicPartitions(tp1._1, tp2._1)).foreach { Review comment: No, that's fine as it is then. I missed this. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ##
[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r573201470 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -20,102 +20,131 @@ package kafka.tools import java.util.Properties import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.math.Ordering.Implicits.infixOrderingOps object GetOffsetShell { + private val topicPartitionPattern = Pattern.compile( "([^:,]*)(?::([0-9]*)(?:-([0-9]*))?)?") def main(args: Array[String]): Unit = { +try { + fetchOffsets(args) +} catch { + case e: Exception => Exit.exit(1, Some(e.getMessage)) +} + } + + private def fetchOffsets(args: Array[String]): Unit = { val parser = new OptionParser(false) -val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") +val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) -val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") +val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) +val topicPartitionsOpt = parser.accepts("topic-partitions", s"Comma separated list of topic-partition patterns to get the offsets for, with the format of '$topicPartitionPattern'." + +" The first group is an optional regex for the topic name, if omitted, it matches any topic name." + +" The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.") + .withRequiredArg + .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3") + .ofType(classOf[String]) +val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) -val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") +val partitionsOpt = parser.accepts("partitions", s"Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.") .withRequiredArg .describedAs("partition ids") .ofType(classOf[String]) - .defaultsTo("") -val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently commited record timestamp is given.]") +val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) -parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") - .withRequiredArg - .describedAs("count") -
[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
dajac commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r558158660 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) -val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") +val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) -val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") +val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) +val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + +" The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") Review comment: Is having empty string as default necessary? It shows as `(default: )` in the description which looks weird. ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) -val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") +val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) -val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") +val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) +val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + +" The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") +val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Ignored if $topicPartitionOpt is present.") Review comment: We should replace `$topicPartitionOpt` by `--topic-partitions` in order to remain consistent with the other descriptions. `$topicPartitionOpt` generates `[topic-partitions]` in the description. ## File path: core/src/main/scala/kaf