junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
     private final int topicCount = 4;
     private final int offsetTopicPartitionCount = 4;
     private final ClusterInstance cluster;
+    private final String topicName = "topic";
 
     public GetOffsetShellTest(ClusterInstance cluster) {
         this.cluster = cluster;
     }
 
     private String getTopicName(int i) {
-        return "topic" + i;
+        return topicName + i;
     }
 
-    public void setUp() {
+    @BeforeEach
+    public void before() {
         cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
         
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
         
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+    }
 
+    public void setUp() {

Review Comment:
   Why do we need to split the logic between `before` and `setUp`?



##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {

Review Comment:
   Could this method be private? Ditto for `produceMessagesInSeparateBatch`.



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##########
@@ -263,13 +262,8 @@ public RecordsInfo info() {
         } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
             return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
         } else {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;
-            else
-                shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-            return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+            // For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+            return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   This has the same issue. The semantic for MAX_TIMESTAMP is the first offset 
with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we 
need to use the baseOffset, instead of lastOffset.
   
   Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?



##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {
     val records = Seq(
       new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
-        null, new Array[Byte](10000)),
+        null, new Array[Byte](10)),
       new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
-        null, new Array[Byte](10000)),
+        null, new Array[Byte](10)),
       new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
-        null, new Array[Byte](10000)),
+        null, new Array[Byte](10)),
     )
-    TestUtils.produceMessages(brokers, records, -1)
+    // create a producer with large linger.ms and enough batch.size (default 
is enough for three 10 bytes records),
+    // so that we can confirm all records will be accumulated in producer 
until we flush them into one batch.
+    val producer = createProducer(
+      plaintextBootstrapServers(brokers),
+      deliveryTimeoutMs = Int.MaxValue,
+      lingerMs = Int.MaxValue,
+      compressionType = compressionType)
+
+    try {
+      val futures = records.map(producer.send)
+      producer.flush()
+      futures.foreach(_.get)
+    } finally {
+      producer.close()
+    }
   }
 
-  def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps)
+  def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = 
{
+    val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 100L,
+        null, new Array[Byte](10)))
+    val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 999L,
+      null, new Array[Byte](10)))
+    val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 200L,
+      null, new Array[Byte](10)))
+
+    val producer = createProducer(
+      plaintextBootstrapServers(brokers),
+      compressionType = compressionType)
+    try {
+      val futures = records.map(producer.send)
+      futures.foreach(_.get)
+      val futures2 = records2.map(producer.send)
+      futures2.foreach(_.get)
+      val futures3 = records3.map(producer.send)
+      futures3.foreach(_.get)
+    } finally {
+      producer.close()
+    }
+  }
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(props => {
+      if (setOldMessageFormat) {
+        props.setProperty("log.message.format.version", "0.10.0")
+        props.setProperty("inter.broker.protocol.version", "0.10.0")
+      }
+      props
+    }).map(KafkaConfig.fromProps)

Review Comment:
   It's a bit simpler to do the following.
   ```
       TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props => 
         if (setOldMessageFormat) {
   ...
       }.map(KafkaConfig.fromProps)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to