This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 4dd893ba212 KAFKA-806 Index may not always observe
log.index.interval.bytes (#18842)
4dd893ba212 is described below
commit 4dd893ba212d05fb2201d884d962f0c0f81ebb10
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Mar 20 16:38:54 2025 +0800
KAFKA-806 Index may not always observe log.index.interval.bytes (#18842)
Currently, each log.append() will add at most 1 index entry, even when
the appended data is larger than log.index.interval.bytes. One potential
issue is that if a follower restarts after being down for a long time,
it may fetch data much bigger than log.index.interval.bytes at a time.
This means that fewer index entries are created, which can increase the
fetch time from the consumers.
(cherry picked from commit e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 5 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 4 +-
.../unit/kafka/server/MockFetcherThread.scala | 1 -
.../kafka/server/ReplicaFetcherThreadTest.scala | 1 -
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 4 +-
.../kafka/storage/internals/log/LocalLog.java | 4 +-
.../kafka/storage/internals/log/LogAppendInfo.java | 75 ++++------
.../kafka/storage/internals/log/LogSegment.java | 46 +++---
.../kafka/storage/internals/log/LogValidator.java | 35 +----
.../kafka/storage/internals/log/LocalLogTest.java | 2 -
.../storage/internals/log/LogSegmentTest.java | 155 ++++++++++++++++-----
.../storage/internals/log/LogValidatorTest.java | 24 ----
13 files changed, 181 insertions(+), 177 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index a4f96ff7e63..5d7ee518963 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -824,7 +824,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this
segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
- dest.append(result.maxOffset, result.maxTimestamp,
result.shallowOffsetOfMaxTimestamp(), retained)
+ dest.append(result.maxOffset, retained)
throttler.maybeThrottle(outputBuffer.limit())
}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 9a977a262b6..3c1a1570871 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -816,7 +816,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
-
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
@@ -902,7 +901,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// will be cleaned up after the log directory is recovered. Note
that the end offset of the
// ProducerStateManager will not be updated and the last stable
offset will not advance
// if the append to the transaction index fails.
- localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp,
appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
+ localLog.append(appendInfo.lastOffset, validRecords)
updateHighWatermarkWithLogEndOffset()
// update the producer state
@@ -1183,7 +1182,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
else
OptionalInt.empty()
- new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt,
maxTimestamp, shallowOffsetOfMaxTimestamp,
+ new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt,
maxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY,
sourceCompression,
validBytesCount, lastOffsetOfFirstBatch,
Collections.emptyList[RecordError], LeaderHwChange.NONE)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index bbcda01451f..9d208055a58 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4455,8 +4455,8 @@ class UnifiedLogTest {
segments.add(seg2)
assertEquals(Seq(Long.MaxValue, Long.MaxValue),
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
- seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE,
new SimpleRecord("one".getBytes)))
- seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE,
new SimpleRecord("two".getBytes)))
+ seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new
SimpleRecord(1000L, "one".getBytes)))
+ seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new
SimpleRecord(2000L, "two".getBytes)))
assertEquals(Seq(1000L, 2000L),
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
seg1.close()
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 2754685b8f4..5d50de04095 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -107,7 +107,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
lastOffset,
lastEpoch,
maxTimestamp,
- shallowOffsetOfMaxTimestamp,
Time.SYSTEM.milliseconds(),
state.logStartOffset,
RecordValidationStats.EMPTY,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index ff556f586c4..6526d6628c3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -465,7 +465,6 @@ class ReplicaFetcherThreadTest {
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
- -1L,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordValidationStats.EMPTY,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index b86a5608c3d..bf8cafac629 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor,
ObjectSerializationCache}
-import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion,
SimpleRecord}
+import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
@@ -402,7 +402,7 @@ class DumpLogSegmentsTest {
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats,
time.scheduler, time)
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
metadataRecords:_*), leaderEpoch = 0)
val secondSegment = log.roll()
- secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L,
MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
+ secondSegment.append(1L, MemoryRecords.withRecords(Compression.NONE,
metadataRecords: _*))
secondSegment.flush()
log.flush(true)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index 027196a5de7..817da5c8318 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -525,8 +525,8 @@ public class LocalLog {
);
}
- public void append(long lastOffset, long largestTimestamp, long
shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
- segments.activeSegment().append(lastOffset, largestTimestamp,
shallowOffsetOfMaxTimestamp, records);
+ public void append(long lastOffset, MemoryRecords records) throws
IOException {
+ segments.activeSegment().append(lastOffset, records);
updateLogEndOffset(lastOffset + 1);
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 05e162a3042..63a8a510818 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -31,13 +31,12 @@ import java.util.OptionalInt;
public class LogAppendInfo {
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new
LogAppendInfo(-1, -1, OptionalInt.empty(),
- RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
+ RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
private long firstOffset;
private long lastOffset;
private long maxTimestamp;
- private long shallowOffsetOfMaxTimestamp;
private long logAppendTime;
private long logStartOffset;
private RecordValidationStats recordValidationStats;
@@ -52,31 +51,29 @@ public class LogAppendInfo {
/**
* Creates an instance with the given params.
*
- * @param firstOffset The first offset in the message
set unless the message format is less than V2 and we are appending
- * to the follower.
- * @param lastOffset The last offset in the message set
- * @param lastLeaderEpoch The partition leader epoch
corresponding to the last offset, if available.
- * @param maxTimestamp The maximum timestamp of the
message set.
- * @param shallowOffsetOfMaxTimestamp The last offset of the batch with
the maximum timestamp.
- * @param logAppendTime The log append time (if used) of
the message set, otherwise Message.NoTimestamp
- * @param logStartOffset The start offset of the log at the
time of this append.
- * @param recordValidationStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
- * @param sourceCompression The source codec used in the
message set (send by the producer)
- * @param validBytes The number of valid bytes
- * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param firstOffset The first offset in the message set
unless the message format is less than V2 and we are appending
+ * to the follower.
+ * @param lastOffset The last offset in the message set
+ * @param lastLeaderEpoch The partition leader epoch corresponding
to the last offset, if available.
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param logAppendTime The log append time (if used) of the
message set, otherwise Message.NoTimestamp
+ * @param logStartOffset The start offset of the log at the time
of this append.
+ * @param recordValidationStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
+ * @param sourceCompression The source codec used in the message set
(send by the producer)
+ * @param validBytes The number of valid bytes
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
- long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
CompressionType sourceCompression,
int validBytes,
long lastOffsetOfFirstBatch) {
- this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp,
shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
+ this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp,
logAppendTime, logStartOffset,
recordValidationStats, sourceCompression, validBytes,
lastOffsetOfFirstBatch, Collections.emptyList(),
LeaderHwChange.NONE);
}
@@ -84,27 +81,25 @@ public class LogAppendInfo {
/**
* Creates an instance with the given params.
*
- * @param firstOffset The first offset in the message
set unless the message format is less than V2 and we are appending
- * to the follower.
- * @param lastOffset The last offset in the message set
- * @param lastLeaderEpoch The partition leader epoch
corresponding to the last offset, if available.
- * @param maxTimestamp The maximum timestamp of the
message set.
- * @param shallowOffsetOfMaxTimestamp The last offset of the batch with
the maximum timestamp.
- * @param logAppendTime The log append time (if used) of
the message set, otherwise Message.NoTimestamp
- * @param logStartOffset The start offset of the log at the
time of this append.
- * @param recordValidationStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
- * @param sourceCompression The source codec used in the
message set (send by the producer)
- * @param validBytes The number of valid bytes
- * @param lastOffsetOfFirstBatch The last offset of the first batch
- * @param recordErrors List of record errors that caused
the respective batch to be dropped
- * @param leaderHwChange Incremental if the high watermark
needs to be increased after appending record
- * Same if high watermark is not
changed. None is the default value and it means append failed
+ * @param firstOffset The first offset in the message set
unless the message format is less than V2 and we are appending
+ * to the follower.
+ * @param lastOffset The last offset in the message set
+ * @param lastLeaderEpoch The partition leader epoch corresponding
to the last offset, if available.
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param logAppendTime The log append time (if used) of the
message set, otherwise Message.NoTimestamp
+ * @param logStartOffset The start offset of the log at the time
of this append.
+ * @param recordValidationStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
+ * @param sourceCompression The source codec used in the message set
(send by the producer)
+ * @param validBytes The number of valid bytes
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param recordErrors List of record errors that caused the
respective batch to be dropped
+ * @param leaderHwChange Incremental if the high watermark needs
to be increased after appending record
+ * Same if high watermark is not changed.
None is the default value and it means append failed
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
- long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
@@ -117,7 +112,6 @@ public class LogAppendInfo {
this.lastOffset = lastOffset;
this.lastLeaderEpoch = lastLeaderEpoch;
this.maxTimestamp = maxTimestamp;
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset;
this.recordValidationStats = recordValidationStats;
@@ -156,14 +150,6 @@ public class LogAppendInfo {
this.maxTimestamp = maxTimestamp;
}
- public long shallowOffsetOfMaxTimestamp() {
- return shallowOffsetOfMaxTimestamp;
- }
-
- public void setShallowOffsetOfMaxTimestamp(long
shallowOffsetOfMaxTimestamp) {
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
- }
-
public long logAppendTime() {
return logAppendTime;
}
@@ -233,12 +219,12 @@ public class LogAppendInfo {
* @return a new instance with the given LeaderHwChange
*/
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
- return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch,
maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
recordValidationStats,
+ return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch,
maxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
sourceCompression, validBytes, lastOffsetOfFirstBatch,
recordErrors, newLeaderHwChange);
}
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long
logStartOffset) {
- return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
}
@@ -248,7 +234,7 @@ public class LogAppendInfo {
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields
recordErrors
*/
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long
logStartOffset, List<RecordError> recordErrors) {
- return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L,
recordErrors, LeaderHwChange.NONE);
}
@@ -259,7 +245,6 @@ public class LogAppendInfo {
", lastOffset=" + lastOffset +
", lastLeaderEpoch=" + lastLeaderEpoch +
", maxTimestamp=" + maxTimestamp +
- ", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp
+
", logAppendTime=" + logAppendTime +
", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordValidationStats +
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 5148408c0e5..b388af1f798 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -232,38 +232,38 @@ public class LogSegment implements Closeable {
* It is assumed this method is being called from within a lock, it is not
thread-safe otherwise.
*
* @param largestOffset The last offset in the message set
- * @param largestTimestampMs The largest timestamp in the message set.
- * @param shallowOffsetOfMaxTimestamp The last offset of earliest batch
with max timestamp in the messages to append.
- * @param records The log entries to append.
+ * @param records The log entries to append.
* @throws LogSegmentOffsetOverflowException if the largest offset causes
index offset overflow
*/
public void append(long largestOffset,
- long largestTimestampMs,
- long shallowOffsetOfMaxTimestamp,
MemoryRecords records) throws IOException {
if (records.sizeInBytes() > 0) {
- LOGGER.trace("Inserting {} bytes at end offset {} at position {}
with largest timestamp {} at offset {}",
- records.sizeInBytes(), largestOffset, log.sizeInBytes(),
largestTimestampMs, shallowOffsetOfMaxTimestamp);
+ LOGGER.trace("Inserting {} bytes at end offset {} at position {}",
+ records.sizeInBytes(), largestOffset, log.sizeInBytes());
int physicalPosition = log.sizeInBytes();
- if (physicalPosition == 0)
- rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
ensureOffsetInRange(largestOffset);
// append the messages
long appendedBytes = log.append(records);
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes,
log.file(), largestOffset);
- // Update the in memory max timestamp and corresponding offset.
- if (largestTimestampMs > maxTimestampSoFar()) {
- maxTimestampAndOffsetSoFar = new
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
- }
- // append an entry to the index (if needed)
- if (bytesSinceLastIndexEntry > indexIntervalBytes) {
- offsetIndex().append(largestOffset, physicalPosition);
- timeIndex().maybeAppend(maxTimestampSoFar(),
shallowOffsetOfMaxTimestampSoFar());
- bytesSinceLastIndexEntry = 0;
+
+ for (RecordBatch batch : records.batches()) {
+ long batchMaxTimestamp = batch.maxTimestamp();
+ long batchLastOffset = batch.lastOffset();
+ if (batchMaxTimestamp > maxTimestampSoFar()) {
+ maxTimestampAndOffsetSoFar = new
TimestampOffset(batchMaxTimestamp, batchLastOffset);
+ }
+
+ if (bytesSinceLastIndexEntry > indexIntervalBytes) {
+ offsetIndex().append(batchLastOffset, physicalPosition);
+ timeIndex().maybeAppend(maxTimestampSoFar(),
shallowOffsetOfMaxTimestampSoFar());
+ bytesSinceLastIndexEntry = 0;
+ }
+ var sizeInBytes = batch.sizeInBytes();
+ physicalPosition += sizeInBytes;
+ bytesSinceLastIndexEntry += sizeInBytes;
}
- bytesSinceLastIndexEntry += records.sizeInBytes();
}
}
@@ -274,8 +274,6 @@ public class LogSegment implements Closeable {
private int appendChunkFromFile(FileRecords records, int position,
BufferSupplier bufferSupplier) throws IOException {
int bytesToAppend = 0;
- long maxTimestamp = Long.MIN_VALUE;
- long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE;
long maxOffset = Long.MIN_VALUE;
ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024);
@@ -284,10 +282,6 @@ public class LogSegment implements Closeable {
Iterator<FileChannelRecordBatch> nextBatches =
records.batchesFrom(position).iterator();
FileChannelRecordBatch batch;
while ((batch = nextAppendableBatch(nextBatches, readBuffer,
bytesToAppend)) != null) {
- if (batch.maxTimestamp() > maxTimestamp) {
- maxTimestamp = batch.maxTimestamp();
- shallowOffsetOfMaxTimestamp = batch.lastOffset();
- }
maxOffset = batch.lastOffset();
bytesToAppend += batch.sizeInBytes();
}
@@ -300,7 +294,7 @@ public class LogSegment implements Closeable {
readBuffer.limit(bytesToAppend);
records.readInto(readBuffer, position);
- append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp,
MemoryRecords.readableRecords(readBuffer));
+ append(maxOffset, MemoryRecords.readableRecords(readBuffer));
}
bufferSupplier.release(readBuffer);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index fc2ba45d101..d74ac0a8b3a 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -65,20 +65,15 @@ public class LogValidator {
public final long logAppendTimeMs;
public final MemoryRecords validatedRecords;
public final long maxTimestampMs;
- // we only maintain batch level offset for max timestamp since we want
to align the behavior of updating time
- // indexing entries. The paths of follower append and replica recovery
do not iterate all records, so they have no
- // idea about record level offset for max timestamp.
- public final long shallowOffsetOfMaxTimestamp;
public final boolean messageSizeMaybeChanged;
public final RecordValidationStats recordValidationStats;
public ValidationResult(long logAppendTimeMs, MemoryRecords
validatedRecords, long maxTimestampMs,
- long shallowOffsetOfMaxTimestamp, boolean
messageSizeMaybeChanged,
+ boolean messageSizeMaybeChanged,
RecordValidationStats recordValidationStats) {
this.logAppendTimeMs = logAppendTimeMs;
this.validatedRecords = validatedRecords;
this.maxTimestampMs = maxTimestampMs;
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.messageSizeMaybeChanged = messageSizeMaybeChanged;
this.recordValidationStats = recordValidationStats;
}
@@ -229,7 +224,6 @@ public class LogValidator {
now,
convertedRecords,
info.maxTimestamp,
- info.shallowOffsetOfMaxTimestamp,
true,
recordValidationStats);
}
@@ -239,8 +233,6 @@ public class LogValidator {
MetricsRecorder
metricsRecorder) {
long now = time.milliseconds();
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
- long shallowOffsetOfMaxTimestamp = -1L;
- long initialOffset = offsetCounter.value;
RecordBatch firstBatch =
getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE);
@@ -269,7 +261,6 @@ public class LogValidator {
if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 &&
maxBatchTimestamp > maxTimestamp) {
maxTimestamp = maxBatchTimestamp;
- shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
}
batch.setLastOffset(offsetCounter.value - 1);
@@ -286,23 +277,10 @@ public class LogValidator {
}
if (timestampType == TimestampType.LOG_APPEND_TIME) {
- maxTimestamp = now;
- // those checks should be equal to MemoryRecordsBuilder#info
- switch (toMagic) {
- case RecordBatch.MAGIC_VALUE_V0:
- maxTimestamp = RecordBatch.NO_TIMESTAMP;
- // value will be the default value: -1
- shallowOffsetOfMaxTimestamp = -1;
- break;
- case RecordBatch.MAGIC_VALUE_V1:
- // Those single-record batches have same max timestamp, so
the initial offset is equal with
- // the last offset of earliest batch
- shallowOffsetOfMaxTimestamp = initialOffset;
- break;
- default:
- // there is only one batch so use the last offset
- shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
- break;
+ if (toMagic == RecordBatch.MAGIC_VALUE_V0) {
+ maxTimestamp = RecordBatch.NO_TIMESTAMP;
+ } else {
+ maxTimestamp = now;
}
}
@@ -310,7 +288,6 @@ public class LogValidator {
now,
records,
maxTimestamp,
- shallowOffsetOfMaxTimestamp,
false,
RecordValidationStats.EMPTY);
}
@@ -434,7 +411,6 @@ public class LogValidator {
now,
records,
maxTimestamp,
- lastOffset,
false,
recordValidationStats);
}
@@ -476,7 +452,6 @@ public class LogValidator {
logAppendTime,
records,
info.maxTimestamp,
- info.shallowOffsetOfMaxTimestamp,
true,
recordValidationStats);
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
index 00b53de34ef..a638f03abde 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
@@ -114,8 +114,6 @@ class LocalLogTest {
private void appendRecords(List<SimpleRecord> records, long initialOffset)
throws IOException {
log.append(initialOffset + records.size() - 1,
- records.get(0).timestamp(),
- initialOffset,
MemoryRecords.withRecords(initialOffset, Compression.NONE, 0,
records.toArray(new SimpleRecord[0])));
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 56bdcf7240f..b798378f1af 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
@@ -48,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -145,7 +147,7 @@ public class LogSegmentTest {
try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
long currentTime = Time.SYSTEM.milliseconds();
MemoryRecords memoryRecords = v1Records(0, "hello");
- assertThrows(LogSegmentOffsetOverflowException.class, () ->
seg.append(largestOffset, currentTime, largestOffset, memoryRecords));
+ assertThrows(LogSegmentOffsetOverflowException.class, () ->
seg.append(largestOffset, memoryRecords));
}
}
@@ -168,7 +170,7 @@ public class LogSegmentTest {
public void testReadBeforeFirstOffset() throws IOException {
try (LogSegment seg = createSegment(40)) {
MemoryRecords ms = v1Records(50, "hello", "there", "little",
"bee");
- seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(53, ms);
Records read = seg.read(41, 300).records;
checkEquals(ms.records().iterator(), read.records().iterator());
}
@@ -183,7 +185,7 @@ public class LogSegmentTest {
long batchBaseOffset = 50;
try (LogSegment seg = createSegment(40)) {
MemoryRecords ms = v2Records(batchBaseOffset, "hello", "there",
"little", "bee");
- seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(53, ms);
FetchDataInfo readInfo = seg.read(52, 300);
assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
}
@@ -196,7 +198,7 @@ public class LogSegmentTest {
public void testReadAfterLast() throws IOException {
try (LogSegment seg = createSegment(40)) {
MemoryRecords ms = v1Records(50, "hello", "there");
- seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(51, ms);
FetchDataInfo read = seg.read(52, 200);
assertNull(read, "Read beyond the last offset in the segment
should give null");
}
@@ -210,9 +212,9 @@ public class LogSegmentTest {
public void testReadFromGap() throws IOException {
try (LogSegment seg = createSegment(40)) {
MemoryRecords ms = v1Records(50, "hello", "there");
- seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(51, ms);
MemoryRecords ms2 = v1Records(60, "alpha", "beta");
- seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+ seg.append(61, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
}
@@ -225,7 +227,7 @@ public class LogSegmentTest {
int maxSize = 1;
try (LogSegment seg = createSegment(40)) {
MemoryRecords ms = v1Records(50, "hello", "there");
- seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(51, ms);
// read at first offset
FetchDataInfo read = seg.read(50, maxSize, maxPosition,
minOneMessage);
@@ -257,9 +259,9 @@ public class LogSegmentTest {
long offset = 40;
for (int i = 0; i < 30; i++) {
MemoryRecords ms1 = v1Records(offset, "hello");
- seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+ seg.append(offset, ms1);
MemoryRecords ms2 = v1Records(offset + 1, "hello");
- seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+ seg.append(offset + 1, ms2);
// check that we can read back both messages
FetchDataInfo read = seg.read(offset, 10000);
@@ -320,7 +322,7 @@ public class LogSegmentTest {
try (LogSegment seg = createSegment(40, 2 * v1Records(0,
"hello").sizeInBytes() - 1)) {
int offset = 40;
for (int i = 0; i < numMessages; i++) {
- seg.append(offset, offset, offset, v1Records(offset, "hello"));
+ seg.append(offset, v1Records(offset, "hello"));
offset++;
}
assertEquals(offset, seg.readNextOffset());
@@ -343,7 +345,12 @@ public class LogSegmentTest {
MockTime time = new MockTime();
try (LogSegment seg = createSegment(40, time)) {
- seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40,
"hello", "there"));
+ seg.append(41,
+ MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 40,
Compression.NONE, TimestampType.CREATE_TIME,
+ List.of(
+ new SimpleRecord("hello".getBytes()),
+ new SimpleRecord("there".getBytes())
+ ).toArray(new SimpleRecord[0])));
// If the segment is empty after truncation, the create time
should be reset
time.sleep(500);
@@ -355,7 +362,7 @@ public class LogSegmentTest {
assertFalse(seg.offsetIndex().isFull());
assertNull(seg.read(0, 1024), "Segment should be empty.");
- seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40,
"hello", "there"));
+ seg.append(41, v1Records(40, "hello", "there"));
}
}
@@ -368,7 +375,7 @@ public class LogSegmentTest {
try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) {
// Produce some messages
for (int i = 40; i < 50; i++) {
- seg.append(i, i * 10, i, v1Records(i, "msg" + i));
+ seg.append(i, v1Records(i, "msg" + i));
}
assertEquals(490, seg.largestTimestamp());
@@ -394,7 +401,7 @@ public class LogSegmentTest {
public void testNextOffsetCalculation() throws IOException {
try (LogSegment seg = createSegment(40)) {
assertEquals(40, seg.readNextOffset());
- seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, v1Records(50,
"hello", "there", "you"));
+ seg.append(52, v1Records(50, "hello", "there", "you"));
assertEquals(53, seg.readNextOffset());
}
}
@@ -437,7 +444,7 @@ public class LogSegmentTest {
public void testRecoveryFixesCorruptIndex() throws Exception {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < 100; i++) {
- seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i,
Integer.toString(i)));
+ seg.append(i, v1Records(i, Integer.toString(i)));
}
File indexFile = seg.offsetIndexFile();
writeNonsenseToFile(indexFile, 5, (int) indexFile.length());
@@ -460,27 +467,27 @@ public class LogSegmentTest {
long pid2 = 10L;
// append transactional records from pid1
- segment.append(101L, RecordBatch.NO_TIMESTAMP,
- 100L, MemoryRecords.withTransactionalRecords(100L,
Compression.NONE,
+ segment.append(101L,
+ MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
pid1, producerEpoch, sequence, partitionLeaderEpoch, new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
// append transactional records from pid2
- segment.append(103L, RecordBatch.NO_TIMESTAMP,
- 102L, MemoryRecords.withTransactionalRecords(102L,
Compression.NONE,
+ segment.append(103L,
+ MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
pid2, producerEpoch, sequence, partitionLeaderEpoch, new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
// append non-transactional records
- segment.append(105L, RecordBatch.NO_TIMESTAMP,
- 104L, MemoryRecords.withRecords(104L, Compression.NONE,
+ segment.append(105L,
+ MemoryRecords.withRecords(104L, Compression.NONE,
partitionLeaderEpoch, new SimpleRecord("a".getBytes()),
new SimpleRecord("b".getBytes())));
// abort the transaction from pid2
- segment.append(106L, RecordBatch.NO_TIMESTAMP,
- 106L, endTxnRecords(ControlRecordType.ABORT, pid2,
producerEpoch, 106L));
+ segment.append(106L,
+ endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch,
106L));
// commit the transaction from pid1
- segment.append(107L, RecordBatch.NO_TIMESTAMP,
- 107L, endTxnRecords(ControlRecordType.COMMIT, pid1,
producerEpoch, 107L));
+ segment.append(107L,
+ endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch,
107L));
ProducerStateManager stateManager = newProducerStateManager();
segment.recover(stateManager, mock(LeaderEpochFileCache.class));
@@ -522,16 +529,16 @@ public class LogSegmentTest {
LeaderEpochCheckpointFile checkpoint = new
LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
LeaderEpochFileCache cache = new
LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new
MockTime()));
- seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L,
MemoryRecords.withRecords(104L, Compression.NONE, 0,
+ seg.append(105L, MemoryRecords.withRecords(104L, Compression.NONE,
0,
new SimpleRecord("a".getBytes()), new
SimpleRecord("b".getBytes())));
- seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L,
MemoryRecords.withRecords(106L, Compression.NONE, 1,
+ seg.append(107L, MemoryRecords.withRecords(106L, Compression.NONE,
1,
new SimpleRecord("a".getBytes()), new
SimpleRecord("b".getBytes())));
- seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L,
MemoryRecords.withRecords(108L, Compression.NONE, 1,
+ seg.append(109L, MemoryRecords.withRecords(108L, Compression.NONE,
1,
new SimpleRecord("a".getBytes()), new
SimpleRecord("b".getBytes())));
- seg.append(111L, RecordBatch.NO_TIMESTAMP, 110L,
MemoryRecords.withRecords(110L, Compression.NONE, 2,
+ seg.append(111L, MemoryRecords.withRecords(110L, Compression.NONE,
2,
new SimpleRecord("a".getBytes()), new
SimpleRecord("b".getBytes())));
seg.recover(newProducerStateManager(), cache);
@@ -567,7 +574,7 @@ public class LogSegmentTest {
public void testRecoveryFixesCorruptTimeIndex() throws IOException {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < 100; i++) {
- seg.append(i, i * 10, i, v1Records(i, String.valueOf(i)));
+ seg.append(i, v1Records(i, String.valueOf(i)));
}
File timeIndexFile = seg.timeIndexFile();
writeNonsenseToFile(timeIndexFile, 5, (int)
timeIndexFile.length());
@@ -590,7 +597,7 @@ public class LogSegmentTest {
for (int ignore = 0; ignore < 10; ignore++) {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < messagesAppended; i++) {
- seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i,
String.valueOf(i)));
+ seg.append(i, v1Records(i, String.valueOf(i)));
}
int offsetToBeginCorruption =
TestUtils.RANDOM.nextInt(messagesAppended);
// start corrupting somewhere in the middle of the chosen
record all the way to the end
@@ -627,9 +634,9 @@ public class LogSegmentTest {
512 * 1024 * 1024, true, "")) {
segments.add(seg);
MemoryRecords ms = v1Records(50, "hello", "there");
- seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(51, ms);
MemoryRecords ms2 = v1Records(60, "alpha", "beta");
- seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+ seg.append(61, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
}
@@ -650,9 +657,9 @@ public class LogSegmentTest {
try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig,
Time.SYSTEM, 512 * 1024 * 1024, true)) {
MemoryRecords ms = v1Records(50, "hello", "there");
- seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ seg.append(51, ms);
MemoryRecords ms2 = v1Records(60, "alpha", "beta");
- seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+ seg.append(61, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
long oldSize = seg.log().sizeInBytes();
@@ -690,9 +697,9 @@ public class LogSegmentTest {
// Given two messages with a gap between them (e.g. mid offset
compacted away)
MemoryRecords ms1 = recordsForTruncateEven(offset, "first
message");
- seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+ seg.append(offset, ms1);
MemoryRecords ms2 = recordsForTruncateEven(offset + 3, "message
after gap");
- seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+ seg.append(offset + 3, ms2);
// When we truncate to an offset without a corresponding log entry
seg.truncateTo(offset + 1);
@@ -743,7 +750,8 @@ public class LogSegmentTest {
try (LogSegment segment = createSegment(1)) {
assertEquals(Long.MAX_VALUE, segment.getFirstBatchTimestamp());
- segment.append(1, 1000L, 1, MemoryRecords.withRecords(1,
Compression.NONE, new SimpleRecord("one".getBytes())));
+ segment.append(1,
+ MemoryRecords.withRecords(1, Compression.NONE, new
SimpleRecord(1000L, "one".getBytes())));
assertEquals(1000L, segment.getFirstBatchTimestamp());
}
}
@@ -780,6 +788,77 @@ public class LogSegmentTest {
}
}
+ @Test
+ public void testIndexForMultipleBatchesInMemoryRecords() throws
IOException {
+ LogSegment segment = createSegment(0, 1, Time.SYSTEM);
+
+ ByteBuffer buffer1 = ByteBuffer.allocate(1024);
+ // append first batch to buffer1
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer1,
Compression.NONE, TimestampType.CREATE_TIME, 0);
+ builder.append(0L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ // append second batch to buffer1
+ builder = MemoryRecords.builder(buffer1, Compression.NONE,
TimestampType.CREATE_TIME, 1);
+ builder.append(1L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ buffer1.flip();
+ MemoryRecords record = MemoryRecords.readableRecords(buffer1);
+ segment.append(1L, record);
+
+ ByteBuffer buffer2 = ByteBuffer.allocate(1024);
+ // append first batch to buffer2
+ builder = MemoryRecords.builder(buffer2, Compression.NONE,
TimestampType.CREATE_TIME, 2);
+ builder.append(2L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ buffer2.flip();
+ record = MemoryRecords.readableRecords(buffer2);
+ segment.append(2L, record);
+
+ assertEquals(2, segment.offsetIndex().entries());
+ assertEquals(1, segment.offsetIndex().entry(0).offset);
+ assertEquals(2, segment.offsetIndex().entry(1).offset);
+
+ assertEquals(2, segment.timeIndex().entries());
+ assertEquals(new TimestampOffset(1, 1), segment.timeIndex().entry(0));
+ assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
+ }
+
+ @Test
+ public void testNonMonotonicTimestampForMultipleBatchesInMemoryRecords()
throws IOException {
+ LogSegment segment = createSegment(0, 1, Time.SYSTEM);
+
+ ByteBuffer buffer1 = ByteBuffer.allocate(1024);
+ // append first batch to buffer1
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer1,
Compression.NONE, TimestampType.CREATE_TIME, 0);
+ builder.append(1L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ // append second batch to buffer1
+ builder = MemoryRecords.builder(buffer1, Compression.NONE,
TimestampType.CREATE_TIME, 1);
+ builder.append(0L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ // append third batch to buffer1
+ builder = MemoryRecords.builder(buffer1, Compression.NONE,
TimestampType.CREATE_TIME, 2);
+ builder.append(2L, "key1".getBytes(), "value1".getBytes());
+ builder.close();
+
+ buffer1.flip();
+ MemoryRecords record = MemoryRecords.readableRecords(buffer1);
+ segment.append(2L, record);
+
+ assertEquals(2, segment.offsetIndex().entries());
+ assertEquals(1, segment.offsetIndex().entry(0).offset);
+ assertEquals(2, segment.offsetIndex().entry(1).offset);
+
+ assertEquals(2, segment.timeIndex().entries());
+ assertEquals(new TimestampOffset(1, 0), segment.timeIndex().entry(0));
+ assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
+ }
+
private ProducerStateManager newProducerStateManager() throws IOException {
return new ProducerStateManager(
topicPartition,
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
index c58b15257ec..3bef886cee6 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
@@ -240,7 +240,6 @@ public class LogValidatorTest {
}
assertEquals(timestamp, validatedResults.maxTimestampMs);
- assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset
of max timestamp should be the last offset 2.");
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size
should have been changed");
verifyRecordValidationStats(
@@ -287,7 +286,6 @@ public class LogValidatorTest {
}
assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs,
"Max timestamp should be " + RecordBatch.NO_TIMESTAMP);
- assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp);
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size
should have been changed");
verifyRecordValidationStats(validatedResults.recordValidationStats, 3,
records, true);
@@ -383,7 +381,6 @@ public class LogValidatorTest {
// Both V2 and V1 have single batch in the validated records when
compression is enabled, and hence their shallow
// OffsetOfMaxTimestamp is the last offset of the single batch
assertEquals(1, iteratorSize(validatedRecords.batches().iterator()));
- assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
assertTrue(validatingResults.messageSizeMaybeChanged,
"Message size should have been changed");
@@ -571,8 +568,6 @@ public class LogValidatorTest {
assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp
should be " + (now + 1));
- int expectedShallowOffsetOfMaxTimestamp = 2;
- assertEquals(expectedShallowOffsetOfMaxTimestamp,
validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp
should be 2");
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size
should not have been changed");
verifyRecordValidationStats(validatedResults.recordValidationStats, 0,
records, true);
@@ -1831,10 +1826,8 @@ public class LogValidatorTest {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
assertEquals(1, iteratorSize(records.batches().iterator()));
- assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
} else {
assertEquals(3, iteratorSize(records.batches().iterator()));
- assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp);
}
assertFalse(validatingResults.messageSizeMaybeChanged,
@@ -1908,8 +1901,6 @@ public class LogValidatorTest {
"MessageSet should still valid");
assertEquals(now, validatedResults.maxTimestampMs,
"Max timestamp should be " + now);
- assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
- "The shallow offset of max timestamp should be the last offset
2 if logAppendTime is used");
assertFalse(validatedResults.messageSizeMaybeChanged,
"Message size should not have been changed");
@@ -1950,8 +1941,6 @@ public class LogValidatorTest {
assertTrue(validatedRecords.batches().iterator().next().isValid(),
"MessageSet should still valid");
assertEquals(now, validatedResults.maxTimestampMs, String.format("Max
timestamp should be %d", now));
- assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
- "The shallow offset of max timestamp should be 2 if
logAppendTime is used");
assertTrue(validatedResults.messageSizeMaybeChanged,
"Message size may have been changed");
@@ -2002,19 +1991,6 @@ public class LogValidatorTest {
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size
should not have been changed");
- int expectedMaxTimestampOffset;
- switch (magic) {
- case RecordBatch.MAGIC_VALUE_V0:
- expectedMaxTimestampOffset = -1;
- break;
- case RecordBatch.MAGIC_VALUE_V1:
- expectedMaxTimestampOffset = 0;
- break;
- default:
- expectedMaxTimestampOffset = 2;
- break;
- }
- assertEquals(expectedMaxTimestampOffset,
validatedResults.shallowOffsetOfMaxTimestamp);
verifyRecordValidationStats(validatedResults.recordValidationStats, 0,
records, false);
}