dengziming commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r819404982
########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging { ) } + @ParameterizedTest + @ValueSource(strings = Array("-1", "latest")) + def testGetLatestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(1)), + ("topic2", 0, Some(2)), + ("topic3", 0, Some(3)), + ("topic4", 0, Some(4)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-2", "earliest")) + def testGetEarliestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(0)), + ("topic2", 0, Some(0)), + ("topic3", 0, Some(0)), + ("topic4", 0, Some(0)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-3", "max-timestamp")) + def testGetOffsetsByMaxTimestamp(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time)) + offsets.foreach{ offset => + // We can't know the exact offsets with max timestamp + assertTrue(offset._3.get >= 0 && offset._3.get <= offset._1.replace("topic", "").toInt) + } + } + @Test def testTopicPartitionsArgWithInternalExcluded(): Unit = { Review comment: Good catch, I added a new test since `GetOffsetShellParsingTest` no longer includes this case. ########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -103,59 +104,76 @@ object GetOffsetShell { throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") } - val listOffsetsTimestamp = options.valueOf(timeOpt).longValue + val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), excludeInternalTopics) + createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) } else { - val partitionIdsRequested = createPartitionSet(options.valueOf(partitionsOpt)) - createTopicPartitionFilterWithTopicAndPartitionPattern( if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, - excludeInternalTopics, - partitionIdsRequested + options.valueOf(partitionsOpt) ) } val config = if (options.has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) + val client = Admin.create(config) try { - val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter) + val partitionInfos = listPartitionInfos(client, topicPartitionFilter, excludeInternalTopics) if (partitionInfos.isEmpty) { throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") } - val topicPartitions = partitionInfos.flatMap { p => - if (p.leader == null) { - System.err.println(s"Error: topic-partition ${p.topic}:${p.partition} does not have a leader. Skip getting offsets") - None - } else - Some(new TopicPartition(p.topic, p.partition)) - } + val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - /* Note that the value of the map can be null */ - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { - case ListOffsetsRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala - case ListOffsetsRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala - case _ => - val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => - if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + val listOffsetsResult = client.listOffsets(timestampsToSearch) + val partitionOffsets = partitionInfos.flatMap { tp => + try { + val partitionInfo = listOffsetsResult.partitionResult(tp).get + Some((tp, partitionInfo.offset)) + } catch { + case e: ExecutionException => + e.getCause match { + case _: LeaderNotAvailableException => + System.err.println(s"Error: topic-partition ${tp.topic}:${tp.partition} does not have a leader. Skip getting offsets") + case _ => + System.err.println(s"Error while getting end offsets for topic-partition ${tp.topic}:${tp.partition}") Review comment: The `KafkaConsumer` will be blocked until we get offsets for all requested partitions or get some fatal error, so `KafkaConsumer.offsetsForTimes` will fail even only one partition fails, and the result will contain all TopicPartitions. However, the AdminClient enables us to get some offsets on partial failure, so we can improve this a little, but we'd better print some information to indicate we ignored some failed TopicPartitions. WDYT? ########## File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala ########## @@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging { ) } + @ParameterizedTest + @ValueSource(strings = Array("-1", "latest")) + def testGetLatestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(1)), + ("topic2", 0, Some(2)), + ("topic3", 0, Some(3)), + ("topic4", 0, Some(4)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-2", "earliest")) + def testGetEarliestOffsets(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) + assertEquals( + List( + ("topic1", 0, Some(0)), + ("topic2", 0, Some(0)), + ("topic3", 0, Some(0)), + ("topic4", 0, Some(0)) + ), + offsets + ) + } + + @ParameterizedTest + @ValueSource(strings = Array("-3", "max-timestamp")) + def testGetOffsetsByMaxTimestamp(time: String): Unit = { + val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time)) + offsets.foreach{ offset => Review comment: Good suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org