urbandan commented on a change in pull request #9430: URL: https://github.com/apache/kafka/pull/9430#discussion_r563667979
########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) + val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + + " The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") Review comment: Not at all, thank you for catching this ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -33,15 +35,26 @@ object GetOffsetShell { def main(args: Array[String]): Unit = { val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg - .describedAs("hostname:port,...,hostname:port") + .describedAs("HOST1:PORT1,...,HOST3:PORT3") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(classOf[String]) + val topicPartitionOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition specifications to get the offsets for, with the format of topic:partition. The 'topic' part can be a regex or may be omitted to only specify the partitions, and query all authorized topics." + + " The 'partition' part can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions of the specified topic.") + .withRequiredArg + .describedAs("topic:partition,...,topic:partition") + .ofType(classOf[String]) + .defaultsTo("") + val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Ignored if $topicPartitionOpt is present.") Review comment: Thanks, fixed it ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -51,28 +64,30 @@ object GetOffsetShell { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) - parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") + val commandConfigOpt = parser.accepts("command-config", s"Consumer config properties file.") .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .describedAs("config file") + .ofType(classOf[String]) + val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.") - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.") + if (args.length == 0) Review comment: I agree, good catch ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) Review comment: Thanks for the catch ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt Review comment: You are right, should be handled, fixed it ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt + partition: Int => partition == number + } } + /** + * Creates a topic-partition filter based on a topic pattern and a set of partition ids. + */ + private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): Option[PartitionInfo => Boolean] = { + topicOpt match { + case Some(topic) => + val topicsFilter = IncludeList(topic) + if(partitionIds.isEmpty) + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) + else + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics) && partitionIds.contains(t.partition)) + case None => + if(excludeInternalTopics) { + if(partitionIds.isEmpty) + Some(t => !Topic.isInternal(t.topic)) + else + Some(t => !Topic.isInternal(t.topic) && partitionIds.contains(t.partition)) + } else { + if(partitionIds.isEmpty) + None + else + Some(t => partitionIds.contains(t.partition)) + } + } Review comment: I was mostly focusing on avoiding unnecessary filtering, but you are right, it shouldn't be an issue, your code is much cleaner ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 Review comment: Removed unnecessary sorting, no duplication ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -89,33 +104,40 @@ object GetOffsetShell { } val listOffsetsTimestamp = options.valueOf(timeOpt).longValue - val config = new Properties + val topicPartitionFilter = if (options.has(topicPartitionOpt)) { + Some(createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionOpt), excludeInternalTopics)) + } else { + createTopicPartitionFilterWithTopicAndPartitionPattern( + if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, + excludeInternalTopics, + partitionIdsRequested + ) + } + + val config = if (options.has(commandConfigOpt)) + Utils.loadProps(options.valueOf(commandConfigOpt)) + else + new Properties config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) - val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match { - case None => - System.err.println(s"Topic $topic does not exist") - Exit.exit(1) - case Some(p) if p.isEmpty => - if (partitionIdsRequested.isEmpty) - System.err.println(s"Topic $topic has 0 partitions") - else - System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}") - Exit.exit(1) - case Some(p) => p - } + val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) - if (partitionIdsRequested.nonEmpty) { - (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId => - System.err.println(s"Error: partition $partitionId does not exist") - } + if (partitionInfos.isEmpty) { + System.err.println(s"Could not match any topic-partitions with the specified filters") + Exit.exit(1) } - val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p => + val topicPartitions = partitionInfos.sortWith((tp1, tp2) => { Review comment: No point, good catch ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -132,23 +154,119 @@ object GetOffsetShell { } } - partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + partitionOffsets.toSeq.sortWith((tp1, tp2) => { + val topicComp = tp1._1.topic.compareTo(tp2._1.topic) + if (topicComp == 0) + tp1._1.partition < tp2._1.partition + else + topicComp < 0 + }).foreach { case (tp, offset) => + println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") } } /** - * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { - val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer - if (partitionInfos.isEmpty) - None - else if (partitionIds.isEmpty) - Some(partitionInfos) - else - Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) + private def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = { + val ruleSpecs = topicPartitions.split(",") + val rules = ruleSpecs.map(ruleSpec => { + val parts = ruleSpec.split(":") + if (parts.length == 1) { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) + } else if (parts.length == 2) { + val partitionFilter = createPartitionFilter(parts(1)) + + if (parts(0).trim().isEmpty) { + tp: PartitionInfo => partitionFilter.apply(tp.partition) + } else { + val whitelist = IncludeList(parts(0)) + tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter.apply(tp.partition) + } + } else { + throw new IllegalArgumentException(s"Invalid topic-partition rule: $ruleSpec") + } + }) + + tp => rules.exists(rule => rule.apply(tp)) + } + + /** + * Creates a partition filter based on a single id or a range. + * Expected format: + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + private def createPartitionFilter(spec: String): Int => Boolean = { + if (spec.indexOf('-') != -1) { + val rangeParts = spec.split("-", -1) + if(rangeParts.length != 2 || rangeParts(0).isEmpty && rangeParts(1).isEmpty) { + throw new IllegalArgumentException(s"Invalid range specification: $spec") + } + + if(rangeParts(0).isEmpty) { + val max = rangeParts(1).toInt + partition: Int => partition < max + } else if(rangeParts(1).isEmpty) { + val min = rangeParts(0).toInt + partition: Int => partition >= min + } else { + val min = rangeParts(0).toInt + val max = rangeParts(1).toInt + + if (min > max) { + throw new IllegalArgumentException(s"Range lower bound cannot be greater than upper bound: $spec") + } + + partition: Int => partition >= min && partition < max + } + } else { + val number = spec.toInt + partition: Int => partition == number + } } + /** + * Creates a topic-partition filter based on a topic pattern and a set of partition ids. + */ + private def createTopicPartitionFilterWithTopicAndPartitionPattern(topicOpt: Option[String], excludeInternalTopics: Boolean, partitionIds: Set[Int]): Option[PartitionInfo => Boolean] = { + topicOpt match { + case Some(topic) => + val topicsFilter = IncludeList(topic) + if(partitionIds.isEmpty) + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics)) + else + Some(t => topicsFilter.isTopicAllowed(t.topic, excludeInternalTopics) && partitionIds.contains(t.partition)) + case None => + if(excludeInternalTopics) { + if(partitionIds.isEmpty) + Some(t => !Topic.isInternal(t.topic)) + else + Some(t => !Topic.isInternal(t.topic) && partitionIds.contains(t.partition)) + } else { + if(partitionIds.isEmpty) + None + else + Some(t => partitionIds.contains(t.partition)) + } + } + } + + /** + * Return the partition infos. Filter them with topicPartitionFilter if specified. + */ + private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: Option[PartitionInfo => Boolean]): Seq[PartitionInfo] = { + val topicListUnfiltered = consumer.listTopics.asScala.values.flatMap { tp => tp.asScala } Review comment: I agree, fixed it ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { + p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount) + p + }).map(KafkaConfig.fromProps) + + @Before + def createTopicAndConsume(): Unit = { + Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) + + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest") + + // Send X messages to each partition of topicX + val producer = new KafkaProducer[String, String](props) + Range(1, topicCount + 1).foreach(i => Range(0, i*i) + .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) + producer.close() + + // Consume so consumer offsets topic is created + val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(topicPattern) + consumer.poll(Duration.ofMillis(1000)) + consumer.commitSync() + consumer.close() + } + + @Test + def testNoFilterOptions(): Unit = { + val offsets = executeAndParse(Array()) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) Review comment: Completely agree, fixed it ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") + + override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(p => { + p.put(KafkaConfig.OffsetsTopicPartitionsProp, offsetTopicPartitionCount) + p + }).map(KafkaConfig.fromProps) + + @Before + def createTopicAndConsume(): Unit = { + Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) + + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "GetOffsetShellTest") + + // Send X messages to each partition of topicX + val producer = new KafkaProducer[String, String](props) + Range(1, topicCount + 1).foreach(i => Range(0, i*i) + .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) + producer.close() + + // Consume so consumer offsets topic is created + val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(topicPattern) + consumer.poll(Duration.ofMillis(1000)) + consumer.commitSync() + consumer.close() + } + + @Test + def testNoFilterOptions(): Unit = { + val offsets = executeAndParse(Array()) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) + assertEquals(offsetTopicPartitionCount, offsets.count(isConsumerOffsetTopicPartition)) + } + + @Test + def testInternalExcluded(): Unit = { + val offsets = executeAndParse(Array("--exclude-internal-topics")) + assertTrue(expectedOffsets() sameElements offsets.filter(r => !isConsumerOffsetTopicPartition(r))) Review comment: I wasn't sure if I should define any expectations on the end offsets of the internal topic, that's why I was testing it separately. But I think you are right, so I refactored the code to handle all topics uniformly ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { + private val topicCount = 4 + private val offsetTopicPartitionCount = 4 + private val topicPattern = Pattern.compile("test.*") Review comment: Was used by the dummy consumer, removed ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -0,0 +1,194 @@ +package kafka.tools + +import java.time.Duration +import java.util.Properties +import java.util.regex.Pattern + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Exit, Logging, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} + +class GetOffsetShellTest extends KafkaServerTestHarness with Logging { Review comment: Agree, added coverage for the arguments and the filter parsing as well ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org