showuon commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r827678933
########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -103,59 +104,77 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } - val listOffsetsTimestamp = options.valueOf(timeOpt).longValue + val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, - excludeInternalTopics, - partitionIdsRequested + options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) + val client = Admin.create(config) try { - val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) + val partitionInfos = listPartitionInfos(client, topicPartitionFilter, excludeInternalTopics) if (partitionInfos.isEmpty) { throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") } - val topicPartitions = partitionInfos.flatMap { p => - if (p.leader == null) { - System.err.println(s"Error: topic-partition ${p.topic}:${p.partition} does not have a leader. Skip getting offsets") - None - } else - Some(new TopicPartition(p.topic, p.partition)) - } + val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - /* Note that the value of the map can be null */ - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { - case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala - case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala - case _ => - val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => - if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + val listOffsetsResult = client.listOffsets(timestampsToSearch) + val partitionOffsets = partitionInfos.flatMap { tp => + try { + val partitionInfo = listOffsetsResult.partitionResult(tp).get + Some((tp, partitionInfo.offset)) Review comment: additional bracket? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -103,59 +104,77 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } - val listOffsetsTimestamp = options.valueOf(timeOpt).longValue + val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, - excludeInternalTopics, - partitionIdsRequested + options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) + val client = Admin.create(config) Review comment: Please rename to `adminClient` for better understanding. ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging { ) } + @ParameterizedTest + @ValueSource(strings = Array("-1", "latest")) + def testGetLatestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(1)), + ("topic2", 0, Some(2)), + ("topic3", 0, Some(3)), + ("topic4", 0, Some(4)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-2", "earliest")) + def testGetEarliestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(0)), + ("topic2", 0, Some(0)), + ("topic3", 0, Some(0)), + ("topic4", 0, Some(0)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-3", "max-timestamp")) + def testGetOffsetsByMaxTimestamp(time: String): Unit = { Review comment: I didn't see the test for `--time timestamp` test. Could we add some? ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -224,9 +252,81 @@ object GetOffsetShell { /** * Return the partition infos. Filter them with topicPartitionFilter. */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = { - consumer.listTopics.asScala.values.flatMap { partitions => - partitions.asScala.filter(topicPartitionFilter) + private def listPartitionInfos( + client: Admin, + topicPartitionFilter: TopicPartitionFilter, + excludeInternalTopics: Boolean + ): Seq[TopicPartition] = { + val listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics) + val topics = client.listTopics(listTopicsOptions).names.get + val filteredTopics = topics.asScala.filter(topicPartitionFilter.isTopicAllowed) + + client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap { case (topic, description) => + description + .partitions + .asScala + .map(tp => new TopicPartition(topic, tp.partition)) + .filter(topicPartitionFilter.isTopicPartitionAllowed) }.toBuffer } } + +/** + * Used to filter partitions after describing them Review comment: Could we remove `after describing them` and have a more general description? Maybe you can put it like `Used to filter partitions certain criteria` or sth else. Same comment applied to below. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org