This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf3f088c944 KAFKA-16341 fix the LogValidator for non-compressed type 
(#15476)
bf3f088c944 is described below

commit bf3f088c944763d8418764064f593d0bf06fbcc3
Author: Johnny Hsu <44309740+johnnych...@users.noreply.github.com>
AuthorDate: Tue Mar 19 23:00:30 2024 +0800

    KAFKA-16341 fix the LogValidator for non-compressed type (#15476)
    
    - Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of 
the first record. Else, we choose the record with the maxTimeStamp.
    - rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
    
    Reviewers: Jun Rao <jun...@gmail.com>, Luke Chen <show...@gmail.com>, 
Chia-Ping Tsai <chia7...@gmail.com>
---
 .../apache/kafka/common/record/MemoryRecords.java  | 22 +++++------
 .../kafka/common/record/MemoryRecordsBuilder.java  |  6 +--
 .../common/record/MemoryRecordsBuilderTest.java    | 10 ++---
 .../kafka/common/record/MemoryRecordsTest.java     |  6 +--
 core/src/main/scala/kafka/log/LocalLog.scala       |  4 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  2 +-
 .../kafka/admin/ListOffsetsIntegrationTest.scala   | 45 +++++++++++++++-------
 .../test/scala/unit/kafka/log/LocalLogTest.scala   |  2 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  4 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 30 ++++++---------
 .../kafka/storage/internals/log/LogSegment.java    | 10 ++---
 .../kafka/storage/internals/log/LogValidator.java  | 16 +++-----
 13 files changed, 84 insertions(+), 75 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 3aa233c34e9..3ba60b09b30 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -209,7 +209,7 @@ public class MemoryRecords extends AbstractRecords {
                                 partition, batch.lastOffset(), 
maxRecordBatchSize, filteredBatchSize);
 
                         MemoryRecordsBuilder.RecordsInfo info = builder.info();
-                        
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, 
info.shallowOffsetOfMaxTimestamp,
+                        
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, 
info.offsetOfMaxTimestamp,
                             maxOffset, retainedRecords.size(), 
filteredBatchSize);
                     }
                 }
@@ -399,7 +399,7 @@ public class MemoryRecords extends AbstractRecords {
         private int bytesRetained = 0;
         private long maxOffset = -1L;
         private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
-        private long shallowOffsetOfMaxTimestamp = -1L;
+        private long offsetOfMaxTimestamp = -1L;
 
         private FilterResult(ByteBuffer outputBuffer) {
             this.outputBuffer = outputBuffer;
@@ -411,21 +411,21 @@ public class MemoryRecords extends AbstractRecords {
                     retainedBatch.lastOffset(), numMessagesInBatch, 
bytesRetained);
         }
 
-        private void updateRetainedBatchMetadata(long maxTimestamp, long 
shallowOffsetOfMaxTimestamp, long maxOffset,
-                                                int messagesRetained, int 
bytesRetained) {
-            validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, 
maxOffset);
+        private void updateRetainedBatchMetadata(long maxTimestamp, long 
offsetOfMaxTimestamp, long maxOffset,
+                                                 int messagesRetained, int 
bytesRetained) {
+            validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, 
maxOffset);
             if (maxTimestamp > this.maxTimestamp) {
                 this.maxTimestamp = maxTimestamp;
-                this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+                this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
             }
             this.maxOffset = Math.max(maxOffset, this.maxOffset);
             this.messagesRetained += messagesRetained;
             this.bytesRetained += bytesRetained;
         }
 
-        private void validateBatchMetadata(long maxTimestamp, long 
shallowOffsetOfMaxTimestamp, long maxOffset) {
-            if (maxTimestamp != RecordBatch.NO_TIMESTAMP && 
shallowOffsetOfMaxTimestamp < 0)
-                throw new IllegalArgumentException("shallowOffset undefined 
for maximum timestamp " + maxTimestamp);
+        private void validateBatchMetadata(long maxTimestamp, long 
offsetOfMaxTimestamp, long maxOffset) {
+            if (maxTimestamp != RecordBatch.NO_TIMESTAMP && 
offsetOfMaxTimestamp < 0)
+                throw new IllegalArgumentException("offset undefined for 
maximum timestamp " + maxTimestamp);
             if (maxOffset < 0)
                 throw new IllegalArgumentException("maxOffset undefined");
         }
@@ -458,8 +458,8 @@ public class MemoryRecords extends AbstractRecords {
             return maxTimestamp;
         }
 
-        public long shallowOffsetOfMaxTimestamp() {
-            return shallowOffsetOfMaxTimestamp;
+        public long offsetOfMaxTimestamp() {
+            return offsetOfMaxTimestamp;
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 3663c5ea7e3..6b53ee41595 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -851,12 +851,12 @@ public class MemoryRecordsBuilder implements 
AutoCloseable {
 
     public static class RecordsInfo {
         public final long maxTimestamp;
-        public final long shallowOffsetOfMaxTimestamp;
+        public final long offsetOfMaxTimestamp;
 
         public RecordsInfo(long maxTimestamp,
-                           long shallowOffsetOfMaxTimestamp) {
+                           long offsetOfMaxTimestamp) {
             this.maxTimestamp = maxTimestamp;
-            this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+            this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 81aa162feee..eaaa95ff673 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -378,7 +378,7 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(logAppendTime, info.maxTimestamp);
         // When logAppendTime is used, the first offset of the batch will be 
the offset of maxTimestamp
-        assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+        assertEquals(0L, info.offsetOfMaxTimestamp);
 
         for (RecordBatch batch : records.batches()) {
             if (magic == MAGIC_VALUE_V0) {
@@ -414,9 +414,9 @@ public class MemoryRecordsBuilderTest {
 
         if (magic == MAGIC_VALUE_V0)
             // in MAGIC_VALUE_V0's case, we don't have timestamp info in 
records, so always return -1.
-            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(-1L, info.offsetOfMaxTimestamp);
         else
-            assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(1L, info.offsetOfMaxTimestamp);
 
         int i = 0;
         long[] expectedTimestamps = new long[] {0L, 2L, 1L};
@@ -495,10 +495,10 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         if (magic == MAGIC_VALUE_V0) {
             assertEquals(-1, info.maxTimestamp);
-            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(-1L, info.offsetOfMaxTimestamp);
         } else {
             assertEquals(2L, info.maxTimestamp);
-            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+            assertEquals(2L, info.offsetOfMaxTimestamp);
         }
 
         long i = 0L;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 50821af841c..9e688fc3ab6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -352,7 +352,7 @@ public class MemoryRecordsTest {
                     assertEquals(0, filterResult.messagesRetained());
                     assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
                     assertEquals(12, filterResult.maxTimestamp());
-                    assertEquals(baseOffset + 1, 
filterResult.shallowOffsetOfMaxTimestamp());
+                    assertEquals(baseOffset + 1, 
filterResult.offsetOfMaxTimestamp());
 
                     // Verify filtered records
                     filtered.flip();
@@ -413,7 +413,7 @@ public class MemoryRecordsTest {
         assertEquals(0, filterResult.messagesRetained());
         assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
         assertEquals(timestamp, filterResult.maxTimestamp());
-        assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
+        assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp());
         assertTrue(filterResult.outputBuffer().position() > 0);
 
         // Verify filtered records
@@ -893,7 +893,7 @@ public class MemoryRecordsTest {
         assertEquals(filtered.limit(), result.bytesRetained());
         if (magic > RecordBatch.MAGIC_VALUE_V0) {
             assertEquals(20L, result.maxTimestamp());
-            assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
+            assertEquals(4L, result.offsetOfMaxTimestamp());
         }
 
         MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index b2121f5312d..a91bfae739e 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File,
     }
   }
 
-  private[log] def append(lastOffset: Long, largestTimestamp: Long, 
shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
-    segments.activeSegment.append(lastOffset, largestTimestamp, 
shallowOffsetOfMaxTimestamp, records)
+  private[log] def append(lastOffset: Long, largestTimestamp: Long, 
offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
+    segments.activeSegment.append(lastOffset, largestTimestamp, 
offsetOfMaxTimestamp, records)
     updateLogEndOffset(lastOffset + 1)
   }
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index b653f40b287..35dadc8672f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int,
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this 
segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
-        dest.append(result.maxOffset, result.maxTimestamp, 
result.shallowOffsetOfMaxTimestamp, retained)
+        dest.append(result.maxOffset, result.maxTimestamp, 
result.offsetOfMaxTimestamp, retained)
         throttler.maybeThrottle(outputBuffer.limit())
       }
 
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index f6198ce9d21..0fdc236e720 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
             validRecords = validateAndOffsetAssignResult.validatedRecords
             
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
-            
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
+            
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs)
             appendInfo.setLastOffset(offset.value - 1)
             
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
             if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
diff --git 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index e5e22e9dff9..5362a1d5e35 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -68,9 +68,7 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     verifyListOffsets()
 
     // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    setUpForLogAppendTimeCase()
     produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
     // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
     // So in this one batch test, it'll be the first offset 0
@@ -79,9 +77,30 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+    produceMessagesInOneBatch()
+    verifyListOffsets()
+
+    // test LogAppendTime case
+    setUpForLogAppendTimeCase()
+    produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+    // In LogAppendTime's case, if the timestamps are the same, we choose the 
offset of the first record
+    // thus, the maxTimestampOffset should be the first record of the batch.
+    // So in this one batch test, it'll be the first offset which is 0
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testThreeNonCompressedRecordsInSeparateBatch(quorum: String): Unit = {
     produceMessagesInSeparateBatch()
     verifyListOffsets()
+
+    // test LogAppendTime case
+    setUpForLogAppendTimeCase()
+    produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+    // In LogAppendTime's case, if the timestamp is different, it should be 
the last one
+    verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
   }
 
   // The message conversion test only run in ZK mode because KRaft mode 
doesn't support "inter.broker.protocol.version" < 3.0
@@ -93,9 +112,7 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     verifyListOffsets()
 
     // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    setUpForLogAppendTimeCase()
     produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
     // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
     // So in this one batch test, it'll be the first offset 0
@@ -111,9 +128,7 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     verifyListOffsets()
 
     // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    setUpForLogAppendTimeCase()
     produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
     // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
     // So in this separate batch test, it'll be the last offset 2
@@ -147,15 +162,19 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     verifyListOffsets()
 
     // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
+    setUpForLogAppendTimeCase()
     produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
     // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
     // So in this separate batch test, it'll be the last offset 2
     verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
   }
 
+  private def setUpForLogAppendTimeCase(): Unit = {
+    val props: Properties = new Properties()
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+    createTopicWithConfig(topicNameWithCustomConfigs, props)
+  }
+
   private def createOldMessageFormatBrokers(): Unit = {
     setOldMessageFormat = true
     recreateBrokers(reconfigure = true, startup = true)
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala 
b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index 29b5fd34f90..bffd41156b3 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -100,7 +100,7 @@ class LocalLogTest {
                             initialOffset: Long = 0L): Unit = {
     log.append(lastOffset = initialOffset + records.size - 1,
       largestTimestamp = records.head.timestamp,
-      shallowOffsetOfMaxTimestamp = initialOffset,
+      offsetOfMaxTimestamp = initialOffset,
       records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
0, records.toList : _*))
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index b4e278c3802..bb0c85a8583 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -86,10 +86,10 @@ class LogSegmentTest {
   def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {
     val seg = createSegment(baseOffset)
     val currentTime = Time.SYSTEM.milliseconds()
-    val shallowOffsetOfMaxTimestamp = largestOffset
+    val offsetOfMaxTimestamp = largestOffset
     val memoryRecords = records(0, "hello")
     assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
-      seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, 
memoryRecords)
+      seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, 
memoryRecords)
     })
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index ac6152b9b15..53b385c62e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -173,9 +173,9 @@ class LogValidatorTest {
     assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should 
be $now")
     assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should 
not have been changed")
 
-    // we index from last offset in version 2 instead of base offset
-    val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 
2 else 0
-    assertEquals(expectedMaxTimestampOffset, 
validatedResults.shallowOffsetOfMaxTimestampMs,
+    // If it's LOG_APPEND_TIME, the offset will be the offset of the first 
record
+    val expectedMaxTimestampOffset = 0
+    assertEquals(expectedMaxTimestampOffset, 
validatedResults.offsetOfMaxTimestampMs,
       s"The offset of max timestamp should be $expectedMaxTimestampOffset")
     verifyRecordValidationStats(validatedResults.recordValidationStats, 
numConvertedRecords = 0, records,
       compressed = false)
@@ -219,7 +219,7 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+    assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
       s"The offset of max timestamp should be 0 if logAppendTime is used")
     assertTrue(validatedResults.messageSizeMaybeChanged,
       "Message size may have been changed")
@@ -271,7 +271,7 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+    assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
       s"The offset of max timestamp should be 0 if logAppendTime is used")
     assertFalse(validatedResults.messageSizeMaybeChanged,
       "Message size should not have been changed")
@@ -404,14 +404,8 @@ class LogValidatorTest {
     assertEquals(now + 1, validatingResults.maxTimestampMs,
       s"Max timestamp should be ${now + 1}")
 
-    val expectedShallowOffsetOfMaxTimestamp = if (magic >= 
RecordVersion.V2.value) {
-      // v2 records are always batched, even when not compressed.
-      // the shallow offset of max timestamp is the last offset of the batch
-      recordList.size - 1
-    } else {
-      1
-    }
-    assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatingResults.shallowOffsetOfMaxTimestampMs,
+    val expectedOffsetOfMaxTimestamp = 1
+    assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
       s"Offset of max timestamp should be 1")
 
     assertFalse(validatingResults.messageSizeMaybeChanged,
@@ -486,7 +480,7 @@ class LogValidatorTest {
     }
     assertEquals(now + 1, validatingResults.maxTimestampMs,
       s"Max timestamp should be ${now + 1}")
-    assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+    assertEquals(1, validatingResults.offsetOfMaxTimestampMs,
       "Offset of max timestamp should be 1")
     assertTrue(validatingResults.messageSizeMaybeChanged,
       "Message size should have been changed")
@@ -538,7 +532,7 @@ class LogValidatorTest {
     }
     assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
       s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
-    assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
+    assertEquals(-1, validatedResults.offsetOfMaxTimestampMs,
       s"Offset of max timestamp should be -1")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should 
have been changed")
 
@@ -585,7 +579,7 @@ class LogValidatorTest {
       assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
     }
     assertEquals(timestamp, validatedResults.maxTimestampMs)
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+    assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
       s"Offset of max timestamp should be 0 when multiple records having the 
same max timestamp.")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should 
have been changed")
 
@@ -657,8 +651,8 @@ class LogValidatorTest {
     }
     assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp 
should be ${now + 1}")
 
-    val expectedShallowOffsetOfMaxTimestamp = 1
-    assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestampMs,
+    val expectedOffsetOfMaxTimestamp = 1
+    assertEquals(expectedOffsetOfMaxTimestamp, 
validatedResults.offsetOfMaxTimestampMs,
       s"Offset of max timestamp should be 1")
     assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should 
not have been changed")
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index d42bf112456..464858b4cd7 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -232,17 +232,17 @@ public class LogSegment implements Closeable {
      *
      * @param largestOffset The last offset in the message set
      * @param largestTimestampMs The largest timestamp in the message set.
-     * @param shallowOffsetOfMaxTimestamp The offset of the message that has 
the largest timestamp in the messages to append.
+     * @param offsetOfMaxTimestamp The offset of the message that has the 
largest timestamp in the messages to append.
      * @param records The log entries to append.
      * @throws LogSegmentOffsetOverflowException if the largest offset causes 
index offset overflow
      */
     public void append(long largestOffset,
                        long largestTimestampMs,
-                       long shallowOffsetOfMaxTimestamp,
+                       long offsetOfMaxTimestamp,
                        MemoryRecords records) throws IOException {
         if (records.sizeInBytes() > 0) {
-            LOGGER.trace("Inserting {} bytes at end offset {} at position {} 
with largest timestamp {} at shallow offset {}",
-                records.sizeInBytes(), largestOffset, log.sizeInBytes(), 
largestTimestampMs, shallowOffsetOfMaxTimestamp);
+            LOGGER.trace("Inserting {} bytes at end offset {} at position {} 
with largest timestamp {} at offset {}",
+                records.sizeInBytes(), largestOffset, log.sizeInBytes(), 
largestTimestampMs, offsetOfMaxTimestamp);
             int physicalPosition = log.sizeInBytes();
             if (physicalPosition == 0)
                 rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
@@ -254,7 +254,7 @@ public class LogSegment implements Closeable {
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
             // Update the in memory max timestamp and corresponding offset.
             if (largestTimestampMs > maxTimestampSoFar()) {
-                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
+                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp);
             }
             // append an entry to the index (if needed)
             if (bytesSinceLastIndexEntry > indexIntervalBytes) {
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index 0cf9cd1c60f..9aa1e06633b 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -68,17 +68,17 @@ public class LogValidator {
         public final long logAppendTimeMs;
         public final MemoryRecords validatedRecords;
         public final long maxTimestampMs;
-        public final long shallowOffsetOfMaxTimestampMs;
+        public final long offsetOfMaxTimestampMs;
         public final boolean messageSizeMaybeChanged;
         public final RecordValidationStats recordValidationStats;
 
         public ValidationResult(long logAppendTimeMs, MemoryRecords 
validatedRecords, long maxTimestampMs,
-                                long shallowOffsetOfMaxTimestampMs, boolean 
messageSizeMaybeChanged,
+                                long offsetOfMaxTimestampMs, boolean 
messageSizeMaybeChanged,
                                 RecordValidationStats recordValidationStats) {
             this.logAppendTimeMs = logAppendTimeMs;
             this.validatedRecords = validatedRecords;
             this.maxTimestampMs = maxTimestampMs;
-            this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs;
+            this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs;
             this.messageSizeMaybeChanged = messageSizeMaybeChanged;
             this.recordValidationStats = recordValidationStats;
         }
@@ -149,7 +149,7 @@ public class LogValidator {
      * avoid expensive re-compression.
      *
      * Returns a ValidationAndOffsetAssignResult containing the validated 
message set, maximum timestamp, the offset
-     * of the shallow message with the max timestamp and a boolean indicating 
whether the message sizes may have changed.
+     * of the message with the max timestamp and a boolean indicating whether 
the message sizes may have changed.
      */
     public ValidationResult 
validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter,
                                                              MetricsRecorder 
metricsRecorder,
@@ -232,7 +232,7 @@ public class LogValidator {
             now,
             convertedRecords,
             info.maxTimestamp,
-            info.shallowOffsetOfMaxTimestamp,
+            info.offsetOfMaxTimestamp,
             true,
             recordValidationStats);
     }
@@ -296,10 +296,6 @@ public class LogValidator {
             offsetOfMaxTimestamp = initialOffset;
         }
 
-        if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-            offsetOfMaxTimestamp = offsetCounter.value - 1;
-        }
-
         return new ValidationResult(
             now,
             records,
@@ -480,7 +476,7 @@ public class LogValidator {
             logAppendTime,
             records,
             info.maxTimestamp,
-            info.shallowOffsetOfMaxTimestamp,
+            info.offsetOfMaxTimestamp,
             true,
             recordValidationStats);
     }

Reply via email to