junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664
########## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ########## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; + private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { - return "topic" + i; + return topicName + i; } - public void setUp() { + @BeforeEach + public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); + } + public void setUp() { Review Comment: Why do we need to split the logic between `before` and `setUp`? ########## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ########## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { Review Comment: Could this method be private? Ditto for `produceMessagesInSeparateBatch`. ########## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ########## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { - long shallowOffsetOfMaxTimestamp; - // Use the last offset when dealing with record batches - if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) - shallowOffsetOfMaxTimestamp = lastOffset; - else - shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; - return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); + // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping + return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? ########## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ########## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { val records = Seq( new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, - null, new Array[Byte](10000)), + null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, - null, new Array[Byte](10000)), + null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, - null, new Array[Byte](10000)), + null, new Array[Byte](10)), ) - TestUtils.produceMessages(brokers, records, -1) + // create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records), + // so that we can confirm all records will be accumulated in producer until we flush them into one batch. + val producer = createProducer( + plaintextBootstrapServers(brokers), + deliveryTimeoutMs = Int.MaxValue, + lingerMs = Int.MaxValue, + compressionType = compressionType) + + try { + val futures = records.map(producer.send) + producer.flush() + futures.foreach(_.get) + } finally { + producer.close() + } } - def generateConfigs: Seq[KafkaConfig] = - TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps) + def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = { + val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, + null, new Array[Byte](10))) + val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10))) + val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10))) + + val producer = createProducer( + plaintextBootstrapServers(brokers), + compressionType = compressionType) + try { + val futures = records.map(producer.send) + futures.foreach(_.get) + val futures2 = records2.map(producer.send) + futures2.foreach(_.get) + val futures3 = records3.map(producer.send) + futures3.foreach(_.get) + } finally { + producer.close() + } + } + + def generateConfigs: Seq[KafkaConfig] = { + TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(props => { + if (setOldMessageFormat) { + props.setProperty("log.message.format.version", "0.10.0") + props.setProperty("inter.broker.protocol.version", "0.10.0") + } + props + }).map(KafkaConfig.fromProps) Review Comment: It's a bit simpler to do the following. ``` TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props => if (setOldMessageFormat) { ... }.map(KafkaConfig.fromProps) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org