This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bf3f088c944 KAFKA-16341 fix the LogValidator for non-compressed type (#15476) bf3f088c944 is described below commit bf3f088c944763d8418764064f593d0bf06fbcc3 Author: Johnny Hsu <44309740+johnnych...@users.noreply.github.com> AuthorDate: Tue Mar 19 23:00:30 2024 +0800 KAFKA-16341 fix the LogValidator for non-compressed type (#15476) - Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp. - rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp Reviewers: Jun Rao <jun...@gmail.com>, Luke Chen <show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../apache/kafka/common/record/MemoryRecords.java | 22 +++++------ .../kafka/common/record/MemoryRecordsBuilder.java | 6 +-- .../common/record/MemoryRecordsBuilderTest.java | 10 ++--- .../kafka/common/record/MemoryRecordsTest.java | 6 +-- core/src/main/scala/kafka/log/LocalLog.scala | 4 +- core/src/main/scala/kafka/log/LogCleaner.scala | 2 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../kafka/admin/ListOffsetsIntegrationTest.scala | 45 +++++++++++++++------- .../test/scala/unit/kafka/log/LocalLogTest.scala | 2 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +- .../scala/unit/kafka/log/LogValidatorTest.scala | 30 ++++++--------- .../kafka/storage/internals/log/LogSegment.java | 10 ++--- .../kafka/storage/internals/log/LogValidator.java | 16 +++----- 13 files changed, 84 insertions(+), 75 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 3aa233c34e9..3ba60b09b30 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -209,7 +209,7 @@ public class MemoryRecords extends AbstractRecords { partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); MemoryRecordsBuilder.RecordsInfo info = builder.info(); - filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp, + filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.offsetOfMaxTimestamp, maxOffset, retainedRecords.size(), filteredBatchSize); } } @@ -399,7 +399,7 @@ public class MemoryRecords extends AbstractRecords { private int bytesRetained = 0; private long maxOffset = -1L; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; - private long shallowOffsetOfMaxTimestamp = -1L; + private long offsetOfMaxTimestamp = -1L; private FilterResult(ByteBuffer outputBuffer) { this.outputBuffer = outputBuffer; @@ -411,21 +411,21 @@ public class MemoryRecords extends AbstractRecords { retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained); } - private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset, - int messagesRetained, int bytesRetained) { - validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset); + private void updateRetainedBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset, + int messagesRetained, int bytesRetained) { + validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, maxOffset); if (maxTimestamp > this.maxTimestamp) { this.maxTimestamp = maxTimestamp; - this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; + this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; } this.maxOffset = Math.max(maxOffset, this.maxOffset); this.messagesRetained += messagesRetained; this.bytesRetained += bytesRetained; } - private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) { - if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0) - throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp); + private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) { + if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0) + throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp); if (maxOffset < 0) throw new IllegalArgumentException("maxOffset undefined"); } @@ -458,8 +458,8 @@ public class MemoryRecords extends AbstractRecords { return maxTimestamp; } - public long shallowOffsetOfMaxTimestamp() { - return shallowOffsetOfMaxTimestamp; + public long offsetOfMaxTimestamp() { + return offsetOfMaxTimestamp; } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 3663c5ea7e3..6b53ee41595 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -851,12 +851,12 @@ public class MemoryRecordsBuilder implements AutoCloseable { public static class RecordsInfo { public final long maxTimestamp; - public final long shallowOffsetOfMaxTimestamp; + public final long offsetOfMaxTimestamp; public RecordsInfo(long maxTimestamp, - long shallowOffsetOfMaxTimestamp) { + long offsetOfMaxTimestamp) { this.maxTimestamp = maxTimestamp; - this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; + this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 81aa162feee..eaaa95ff673 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -378,7 +378,7 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); // When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp - assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + assertEquals(0L, info.offsetOfMaxTimestamp); for (RecordBatch batch : records.batches()) { if (magic == MAGIC_VALUE_V0) { @@ -414,9 +414,9 @@ public class MemoryRecordsBuilderTest { if (magic == MAGIC_VALUE_V0) // in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1. - assertEquals(-1L, info.shallowOffsetOfMaxTimestamp); + assertEquals(-1L, info.offsetOfMaxTimestamp); else - assertEquals(1L, info.shallowOffsetOfMaxTimestamp); + assertEquals(1L, info.offsetOfMaxTimestamp); int i = 0; long[] expectedTimestamps = new long[] {0L, 2L, 1L}; @@ -495,10 +495,10 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder.RecordsInfo info = builder.info(); if (magic == MAGIC_VALUE_V0) { assertEquals(-1, info.maxTimestamp); - assertEquals(-1L, info.shallowOffsetOfMaxTimestamp); + assertEquals(-1L, info.offsetOfMaxTimestamp); } else { assertEquals(2L, info.maxTimestamp); - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + assertEquals(2L, info.offsetOfMaxTimestamp); } long i = 0L; diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 50821af841c..9e688fc3ab6 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -352,7 +352,7 @@ public class MemoryRecordsTest { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(12, filterResult.maxTimestamp()); - assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); + assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp()); // Verify filtered records filtered.flip(); @@ -413,7 +413,7 @@ public class MemoryRecordsTest { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); + assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp()); assertTrue(filterResult.outputBuffer().position() > 0); // Verify filtered records @@ -893,7 +893,7 @@ public class MemoryRecordsTest { assertEquals(filtered.limit(), result.bytesRetained()); if (magic > RecordBatch.MAGIC_VALUE_V0) { assertEquals(20L, result.maxTimestamp()); - assertEquals(4L, result.shallowOffsetOfMaxTimestamp()); + assertEquals(4L, result.offsetOfMaxTimestamp()); } MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b2121f5312d..a91bfae739e 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File, } } - private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { - segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) + private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { + segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records) updateLogEndOffset(lastOffset + 1) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index b653f40b287..35dadc8672f 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int, val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp, retained) + dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained) throttler.maybeThrottle(outputBuffer.limit()) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index f6198ce9d21..0fdc236e720 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) + appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index e5e22e9dff9..5362a1d5e35 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -68,9 +68,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { verifyListOffsets() // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) + setUpForLogAppendTimeCase() produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. // So in this one batch test, it'll be the first offset 0 @@ -79,9 +77,30 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatch(quorum: String): Unit = { + def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = { + produceMessagesInOneBatch() + verifyListOffsets() + + // test LogAppendTime case + setUpForLogAppendTimeCase() + produceMessagesInOneBatch(topic=topicNameWithCustomConfigs) + // In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record + // thus, the maxTimestampOffset should be the first record of the batch. + // So in this one batch test, it'll be the first offset which is 0 + verifyListOffsets(topic = topicNameWithCustomConfigs, 0) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testThreeNonCompressedRecordsInSeparateBatch(quorum: String): Unit = { produceMessagesInSeparateBatch() verifyListOffsets() + + // test LogAppendTime case + setUpForLogAppendTimeCase() + produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) + // In LogAppendTime's case, if the timestamp is different, it should be the last one + verifyListOffsets(topic = topicNameWithCustomConfigs, 2) } // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0 @@ -93,9 +112,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { verifyListOffsets() // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) + setUpForLogAppendTimeCase() produceMessagesInOneBatch(topic = topicNameWithCustomConfigs) // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. // So in this one batch test, it'll be the first offset 0 @@ -111,9 +128,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { verifyListOffsets() // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) + setUpForLogAppendTimeCase() produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. // So in this separate batch test, it'll be the last offset 2 @@ -147,15 +162,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { verifyListOffsets() // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) + setUpForLogAppendTimeCase() produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. // So in this separate batch test, it'll be the last offset 2 verifyListOffsets(topic = topicNameWithCustomConfigs, 2) } + private def setUpForLogAppendTimeCase(): Unit = { + val props: Properties = new Properties() + props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") + createTopicWithConfig(topicNameWithCustomConfigs, props) + } + private def createOldMessageFormatBrokers(): Unit = { setOldMessageFormat = true recreateBrokers(reconfigure = true, startup = true) diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 29b5fd34f90..bffd41156b3 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -100,7 +100,7 @@ class LocalLogTest { initialOffset: Long = 0L): Unit = { log.append(lastOffset = initialOffset + records.size - 1, largestTimestamp = records.head.timestamp, - shallowOffsetOfMaxTimestamp = initialOffset, + offsetOfMaxTimestamp = initialOffset, records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*)) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index b4e278c3802..bb0c85a8583 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -86,10 +86,10 @@ class LogSegmentTest { def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { val seg = createSegment(baseOffset) val currentTime = Time.SYSTEM.milliseconds() - val shallowOffsetOfMaxTimestamp = largestOffset + val offsetOfMaxTimestamp = largestOffset val memoryRecords = records(0, "hello") assertThrows(classOf[LogSegmentOffsetOverflowException], () => { - seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) + seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, memoryRecords) }) } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index ac6152b9b15..53b385c62e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -173,9 +173,9 @@ class LogValidatorTest { assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") - // we index from last offset in version 2 instead of base offset - val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 - assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs, + // If it's LOG_APPEND_TIME, the offset will be the offset of the first record + val expectedMaxTimestampOffset = 0 + assertEquals(expectedMaxTimestampOffset, validatedResults.offsetOfMaxTimestampMs, s"The offset of max timestamp should be $expectedMaxTimestampOffset") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) @@ -219,7 +219,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(0, validatedResults.offsetOfMaxTimestampMs, s"The offset of max timestamp should be 0 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -271,7 +271,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(0, validatedResults.offsetOfMaxTimestampMs, s"The offset of max timestamp should be 0 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -404,14 +404,8 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) { - // v2 records are always batched, even when not compressed. - // the shallow offset of max timestamp is the last offset of the batch - recordList.size - 1 - } else { - 1 - } - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, + val expectedOffsetOfMaxTimestamp = 1 + assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") assertFalse(validatingResults.messageSizeMaybeChanged, @@ -486,7 +480,7 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, + assertEquals(1, validatingResults.offsetOfMaxTimestampMs, "Offset of max timestamp should be 1") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -538,7 +532,7 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(-1, validatedResults.offsetOfMaxTimestampMs, s"Offset of max timestamp should be -1") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -585,7 +579,7 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(0, validatedResults.offsetOfMaxTimestampMs, s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -657,8 +651,8 @@ class LogValidatorTest { } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedShallowOffsetOfMaxTimestamp = 1 - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, + val expectedOffsetOfMaxTimestamp = 1 + assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.offsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index d42bf112456..464858b4cd7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -232,17 +232,17 @@ public class LogSegment implements Closeable { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ public void append(long largestOffset, long largestTimestampMs, - long shallowOffsetOfMaxTimestamp, + long offsetOfMaxTimestamp, MemoryRecords records) throws IOException { if (records.sizeInBytes() > 0) { - LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at shallow offset {}", - records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); + LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}", + records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, offsetOfMaxTimestamp); int physicalPosition = log.sizeInBytes(); if (physicalPosition == 0) rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); @@ -254,7 +254,7 @@ public class LogSegment implements Closeable { LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); // Update the in memory max timestamp and corresponding offset. if (largestTimestampMs > maxTimestampSoFar()) { - maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); + maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp); } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 0cf9cd1c60f..9aa1e06633b 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -68,17 +68,17 @@ public class LogValidator { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - public final long shallowOffsetOfMaxTimestampMs; + public final long offsetOfMaxTimestampMs; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, + long offsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; - this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs; + this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs; this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.recordValidationStats = recordValidationStats; } @@ -149,7 +149,7 @@ public class LogValidator { * avoid expensive re-compression. * * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset - * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. + * of the message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter, MetricsRecorder metricsRecorder, @@ -232,7 +232,7 @@ public class LogValidator { now, convertedRecords, info.maxTimestamp, - info.shallowOffsetOfMaxTimestamp, + info.offsetOfMaxTimestamp, true, recordValidationStats); } @@ -296,10 +296,6 @@ public class LogValidator { offsetOfMaxTimestamp = initialOffset; } - if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { - offsetOfMaxTimestamp = offsetCounter.value - 1; - } - return new ValidationResult( now, records, @@ -480,7 +476,7 @@ public class LogValidator { logAppendTime, records, info.maxTimestamp, - info.shallowOffsetOfMaxTimestamp, + info.offsetOfMaxTimestamp, true, recordValidationStats); }