This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 1ee20e3aa0f KAFKA-16342 fix getOffsetByMaxTimestamp for compressed 
records (#15474)
1ee20e3aa0f is described below

commit 1ee20e3aa0fbe541ed7629388ce84e4a7b9c33b3
Author: Luke Chen <show...@gmail.com>
AuthorDate: Fri Mar 15 06:09:45 2024 +0800

    KAFKA-16342 fix getOffsetByMaxTimestamp for compressed records (#15474)
    
    Fix getOffsetByMaxTimestamp for compressed records.
    
    This PR adds:
    
    1) For inPlaceAssignment case, compute the correct offset for maxTimestamp 
when traversing the batch records, and set to ValidationResult in the end, 
instead of setting to last offset always.
    
    2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log 
create time, like non-compressed, and inPlaceAssignment cases, instead of 
setting to last offset always.
    
    3) Add tests to verify the fix.
    
    Reviewers: Jun Rao <jun...@apache.org>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/common/record/MemoryRecordsBuilder.java  |  29 +--
 .../common/record/MemoryRecordsBuilderTest.java    |  23 ++-
 .../kafka/common/record/MemoryRecordsTest.java     |   5 +-
 .../kafka/admin/ListOffsetsIntegrationTest.scala   | 202 ++++++++++++++++++---
 .../kafka/server/QuorumTestHarness.scala           |   2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  29 +--
 .../kafka/storage/internals/log/LogValidator.java  |  13 +-
 .../org/apache/kafka/tools/GetOffsetShellTest.java |  11 +-
 8 files changed, 233 insertions(+), 81 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054d72ee822..88991328bb7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -242,34 +242,23 @@ public class MemoryRecordsBuilder implements 
AutoCloseable {
 
     /**
      * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+     * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
      *
-     * If the log append time is used, the offset will be the last offset 
unless no compression is used and
-     * the message format version is 0 or 1, in which case, it will be the 
first offset.
+     * If the log append time is used, the offset will be the first offset of 
the record.
      *
-     * If create time is used, the offset will be the last offset unless no 
compression is used and the message
-     * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+     * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+     *
+     * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
      *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;
-            else
-                shallowOffsetOfMaxTimestamp = baseOffset;
-            return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+            return new RecordsInfo(logAppendTime, baseOffset);
         } else {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;
-            else
-                shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-            return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+            // For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+            // If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+            return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 5616fb23f7d..eb28ba47ef5 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -377,11 +377,8 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(logAppendTime, info.maxTimestamp);
-
-        if (args.compressionType == CompressionType.NONE && magic <= 
MAGIC_VALUE_V1)
-            assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
-        else
-            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+        // When logAppendTime is used, the first offset of the batch will be 
the offset of maxTimestamp
+        assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
 
         for (RecordBatch batch : records.batches()) {
             if (magic == MAGIC_VALUE_V0) {
@@ -415,10 +412,11 @@ public class MemoryRecordsBuilderTest {
             assertEquals(2L, info.maxTimestamp);
         }
 
-        if (args.compressionType == CompressionType.NONE && magic == 
MAGIC_VALUE_V1)
-            assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+        if (magic == MAGIC_VALUE_V0)
+            // in MAGIC_VALUE_V0's case, we don't have timestamp info in 
records, so always return -1.
+            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
         else
-            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
 
         int i = 0;
         long[] expectedTimestamps = new long[] {0L, 2L, 1L};
@@ -495,12 +493,13 @@ public class MemoryRecordsBuilderTest {
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
-        if (magic == MAGIC_VALUE_V0)
+        if (magic == MAGIC_VALUE_V0) {
             assertEquals(-1, info.maxTimestamp);
-        else
+            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
+        } else {
             assertEquals(2L, info.maxTimestamp);
-
-        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+        }
 
         long i = 0L;
         for (RecordBatch batch : records.batches()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 3f0195bf5d1..50821af841c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -893,10 +893,7 @@ public class MemoryRecordsTest {
         assertEquals(filtered.limit(), result.bytesRetained());
         if (magic > RecordBatch.MAGIC_VALUE_V0) {
             assertEquals(20L, result.maxTimestamp());
-            if (compression == CompressionType.NONE && magic < 
RecordBatch.MAGIC_VALUE_V2)
-                assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
-            else
-                assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
+            assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
         }
 
         MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
diff --git 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index 333bd980a60..e5e22e9dff9 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    createTopic(topicName, 1, 1.toShort)
-    produceMessages()
+    createTopicWithConfig(topicName, new Properties())
     adminClient = Admin.create(Map[String, Object](
       AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
     ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+    setOldMessageFormat = false
     Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
     super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-    assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+    produceMessagesInOneBatch("gzip")
+    verifyListOffsets()
+
+    // test LogAppendTime case
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+    // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+    // So in this one batch test, it'll be the first offset 0
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-    assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+    produceMessagesInSeparateBatch()
+    verifyListOffsets()
+  }
+
+  // The message conversion test only run in ZK mode because KRaft mode 
doesn't support "inter.broker.protocol.version" < 3.0
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = {
+    createOldMessageFormatBrokers()
+    produceMessagesInOneBatch()
+    verifyListOffsets()
+
+    // test LogAppendTime case
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
+    // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+    // So in this one batch test, it'll be the first offset 0
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
+  }
+
+  // The message conversion test only run in ZK mode because KRaft mode 
doesn't support "inter.broker.protocol.version" < 3.0
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): 
Unit = {
+    createOldMessageFormatBrokers()
+    produceMessagesInSeparateBatch()
+    verifyListOffsets()
+
+    // test LogAppendTime case
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+    // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+    // So in this separate batch test, it'll be the last offset 2
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def 
testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: 
String): Unit = {
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
+    verifyListOffsets(topic = topicNameWithCustomConfigs)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def 
testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer(quorum: 
String): Unit = {
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+    verifyListOffsets(topic = topicNameWithCustomConfigs)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testMaxTimestampOffset(quorum: String): Unit = {
-    val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
-    assertEquals(1, maxTimestampOffset.offset())
+  def testThreeCompressedRecordsInSeparateBatch(quorum: String): Unit = {
+    produceMessagesInSeparateBatch("gzip")
+    verifyListOffsets()
+
+    // test LogAppendTime case
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
+    // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+    // So in this separate batch test, it'll be the last offset 2
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
+  }
+
+  private def createOldMessageFormatBrokers(): Unit = {
+    setOldMessageFormat = true
+    recreateBrokers(reconfigure = true, startup = true)
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
+    ).asJava)
+  }
+
+  private def createTopicWithConfig(topic: String, props: Properties): Unit = {
+    createTopic(topic, 1, 1.toShort, topicConfig = props)
+  }
+
+  private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+    assertEquals(0, earliestOffset.offset())
+
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
+    assertEquals(3, latestOffset.offset())
+
+    val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+    assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
   }
 
   private def runFetchOffsets(adminClient: Admin,
-                              offsetSpec: OffsetSpec): 
ListOffsetsResult.ListOffsetsResultInfo = {
-    val tp = new TopicPartition(topicName, 0)
+                              offsetSpec: OffsetSpec,
+                              topic: String): 
ListOffsetsResult.ListOffsetsResultInfo = {
+    val tp = new TopicPartition(topic, 0)
     adminClient.listOffsets(Map(
       tp -> offsetSpec
     ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  private def produceMessagesInOneBatch(compressionType: String = "none", 
topic: String = topicName): Unit = {
     val records = Seq(
-      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
-        null, new Array[Byte](10000)),
-      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
-        null, new Array[Byte](10000)),
-      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
-        null, new Array[Byte](10000)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L,
+        null, new Array[Byte](10)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L,
+        null, new Array[Byte](10)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L,
+        null, new Array[Byte](10)),
     )
-    TestUtils.produceMessages(brokers, records, -1)
+    // create a producer with large linger.ms and enough batch.size (default 
is enough for three 10 bytes records),
+    // so that we can confirm all records will be accumulated in producer 
until we flush them into one batch.
+    val producer = createProducer(
+      plaintextBootstrapServers(brokers),
+      deliveryTimeoutMs = Int.MaxValue,
+      lingerMs = Int.MaxValue,
+      compressionType = compressionType)
+
+    try {
+      val futures = records.map(producer.send)
+      producer.flush()
+      futures.foreach(_.get)
+    } finally {
+      producer.close()
+    }
   }
 
-  def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps)
+  private def produceMessagesInSeparateBatch(compressionType: String = "none", 
topic: String = topicName): Unit = {
+    val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
100L,
+        null, new Array[Byte](10)))
+    val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
999L,
+      null, new Array[Byte](10)))
+    val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
200L,
+      null, new Array[Byte](10)))
+
+    val producer = createProducer(
+      plaintextBootstrapServers(brokers),
+      compressionType = compressionType)
+    try {
+      val futures = records.map(producer.send)
+      futures.foreach(_.get)
+      // advance the server time after each record sent to make sure the time 
changed when appendTime is used
+      mockTime.sleep(100)
+      val futures2 = records2.map(producer.send)
+      futures2.foreach(_.get)
+      mockTime.sleep(100)
+      val futures3 = records3.map(producer.send)
+      futures3.foreach(_.get)
+    } finally {
+      producer.close()
+    }
+  }
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props =>
+      if (setOldMessageFormat) {
+        props.setProperty("log.message.format.version", "0.10.0")
+        props.setProperty("inter.broker.protocol.version", "0.10.0")
+      }
+      props
+    }.map(KafkaConfig.fromProps)
+  }
 }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 1a83809c0ad..4d2ff87483f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -124,7 +124,7 @@ class KRaftQuorumImplementation(
       util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR))
     val sharedServer = new SharedServer(config,
       metaPropertiesEnsemble,
-      Time.SYSTEM,
+      time,
       new Metrics(),
       controllerQuorumVotersFuture,
       faultHandlerFactory)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 0b5d17cbcc2..ac6152b9b15 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -219,8 +219,8 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(records.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"The offset of max timestamp should be ${records.records.asScala.size - 
1}")
+    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"The offset of max timestamp should be 0 if logAppendTime is used")
     assertTrue(validatedResults.messageSizeMaybeChanged,
       "Message size may have been changed")
 
@@ -271,8 +271,8 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(records.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"The offset of max timestamp should be ${records.records.asScala.size - 
1}")
+    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"The offset of max timestamp should be 0 if logAppendTime is used")
     assertFalse(validatedResults.messageSizeMaybeChanged,
       "Message size should not have been changed")
 
@@ -341,6 +341,7 @@ class LogValidatorTest {
 
   private def checkNonCompressed(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
+    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, 
partitionLeaderEpoch) =
@@ -431,6 +432,7 @@ class LogValidatorTest {
 
   private def checkRecompression(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
+    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, 
partitionLeaderEpoch) =
@@ -484,8 +486,8 @@ class LogValidatorTest {
     }
     assertEquals(now + 1, validatingResults.maxTimestampMs,
       s"Max timestamp should be ${now + 1}")
-    assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs,
-      "Offset of max timestamp should be 2")
+    assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+      "Offset of max timestamp should be 1")
     assertTrue(validatingResults.messageSizeMaybeChanged,
       "Message size should have been changed")
 
@@ -536,8 +538,8 @@ class LogValidatorTest {
     }
     assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
       s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
-    assertEquals(validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}")
+    assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"Offset of max timestamp should be -1")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should 
have been changed")
 
     verifyRecordValidationStats(validatedResults.recordValidationStats, 
numConvertedRecords = 3, records,
@@ -583,8 +585,8 @@ class LogValidatorTest {
       assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
     }
     assertEquals(timestamp, validatedResults.maxTimestampMs)
-    assertEquals(validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}")
+    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"Offset of max timestamp should be 0 when multiple records having the 
same max timestamp.")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should 
have been changed")
 
     verifyRecordValidationStats(validatedResults.recordValidationStats, 
numConvertedRecords = 3, records,
@@ -598,6 +600,7 @@ class LogValidatorTest {
 
   private def checkCompressed(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
+    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, 
partitionLeaderEpoch) =
@@ -654,11 +657,9 @@ class LogValidatorTest {
     }
     assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp 
should be ${now + 1}")
 
-    // All versions have an outer batch when compressed, so the shallow offset
-    // of max timestamp is always the offset of the last record in the batch.
-    val expectedShallowOffsetOfMaxTimestamp = recordList.size - 1
+    val expectedShallowOffsetOfMaxTimestamp = 1
     assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}")
+      s"Offset of max timestamp should be 1")
     assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should 
not have been changed")
 
     verifyRecordValidationStats(validatedResults.recordValidationStats, 
numConvertedRecords = 0, records,
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index 378247fa270..0cf9cd1c60f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -330,6 +330,8 @@ public class LogValidator {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         LongRef expectedInnerOffset = PrimitiveRef.ofLong(0);
         List<Record> validatedRecords = new ArrayList<>();
+        long offsetOfMaxTimestamp = -1;
+        long initialOffset = offsetCounter.value;
 
         int uncompressedSizeInBytes = 0;
 
@@ -379,8 +381,11 @@ public class LogValidator {
                             && batch.magic() > RecordBatch.MAGIC_VALUE_V0
                             && toMagic > RecordBatch.MAGIC_VALUE_V0) {
 
-                        if (record.timestamp() > maxTimestamp)
+                        if (record.timestamp() > maxTimestamp) {
                             maxTimestamp = record.timestamp();
+                            // The offset is only increased when it is a valid 
record
+                            offsetOfMaxTimestamp = initialOffset + 
validatedRecords.size();
+                        }
 
                         // Some older clients do not implement the V1 internal 
offsets correctly.
                         // Historically the broker handled this by rewriting 
the batches rather
@@ -417,8 +422,10 @@ public class LogValidator {
             long lastOffset = offsetCounter.value - 1;
             firstBatch.setLastOffset(lastOffset);
 
-            if (timestampType == TimestampType.LOG_APPEND_TIME)
+            if (timestampType == TimestampType.LOG_APPEND_TIME) {
                 maxTimestamp = now;
+                offsetOfMaxTimestamp = initialOffset;
+            }
 
             if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
                 firstBatch.setMaxTimestamp(timestampType, maxTimestamp);
@@ -431,7 +438,7 @@ public class LogValidator {
                 now,
                 records,
                 maxTimestamp,
-                lastOffset,
+                offsetOfMaxTimestamp,
                 false,
                 recordValidationStats);
         }
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 417bbe71168..ce9c1718ffb 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
     private final int topicCount = 4;
     private final int offsetTopicPartitionCount = 4;
     private final ClusterInstance cluster;
+    private final String topicName = "topic";
 
     public GetOffsetShellTest(ClusterInstance cluster) {
         this.cluster = cluster;
     }
 
     private String getTopicName(int i) {
-        return "topic" + i;
+        return topicName + i;
     }
 
-    public void setUp() {
+    @BeforeEach
+    public void before() {
         cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
         
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
         
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+    }
 
+    private void setUp() {
         try (Admin admin = 
Admin.create(cluster.config().adminClientProperties())) {
             List<NewTopic> topics = new ArrayList<>();
 
@@ -333,7 +338,7 @@ public class GetOffsetShellTest {
     }
 
     private List<Row> expectedOffsetsWithInternal() {
-        List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount + 
1)
+        List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount)
                 .mapToObj(i -> new Row("__consumer_offsets", i, 0L))
                 .collect(Collectors.toList());
 

Reply via email to