TaiJuWu commented on code in PR #21761:
URL: https://github.com/apache/kafka/pull/21761#discussion_r2960064583
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -3407,6 +3422,887 @@ public void
testFetchLatestTieredTimestampWithRemoteStorage() throws Exception {
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
}
+ @Test
+ public void testFetchEarliestPendingUploadTimestampNoRemoteStorage()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test initial state before any records
+ assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+ // Append records
+ prepareLogWithSequentialRecords(log, 2);
+
+ // Test state after records are appended
+ assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void testFetchEarliestPendingUploadTimestampWithRemoteStorage()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ if (timestamp == firstTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+ }
+ return Optional.empty();
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offset 0 (first timestamp) is in remote storage and deleted
locally. Offset 1 (second timestamp) is in local storage.
+ log.updateLocalLogStartOffset(1);
+ log.updateHighestOffsetInRemoteStorage(0);
+
+ // In the assertions below we test that offset 0 (first timestamp) is
only in remote and offset 1 (second timestamp) is in local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // Offsets upto 1 are in remote storage
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ if (timestamp == firstTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+ } else if (timestamp == secondTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 1L, Optional.of(secondLeaderEpoch)));
+ }
+ return Optional.empty();
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offsets 0, 1 (first and second timestamps) are in remote storage
and not deleted locally.
+ log.updateLocalLogStartOffset(0);
+ log.updateHighestOffsetInRemoteStorage(1);
+
+ // In the assertions below we test that offset 0 (first timestamp) and
offset 1 (second timestamp) are on both remote and local storage
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void testFetchEarliestPendingUploadTimestampNoSegmentsUploaded()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // No offsets are in remote storage
+ doAnswer(ans ->
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offsets 0, 1, 2 (first, second and third timestamps) are in local
storage only and not uploaded to remote storage.
+ log.updateLocalLogStartOffset(0);
+ log.updateHighestOffsetInRemoteStorage(-1);
+
+ // In the assertions below we test that offset 0 (first timestamp),
offset 1 (second timestamp) and offset 2 (third timestamp) are only on the
local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L,
Optional.of(-1)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void
testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote() throws
Exception {
+ int logStartOffset = 100;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // Offsets 100, 101, 102 (first, second and third timestamps) are in
local storage and not uploaded to remote storage.
+ // Tiered storage copy was disabled and then enabled again, because of
which the remote log segments are deleted but
+ // the highest offset in remote storage has become stale
+ doAnswer(ans ->
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ log.updateLocalLogStartOffset(100);
+ log.updateHighestOffsetInRemoteStorage(50);
+
+ // In the assertions below we test that offset 100 (first timestamp),
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the
local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp,
100L, Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
101L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L,
Optional.empty()),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ /**
+ * Test the Log truncate operations
+ */
+ @Test
+ public void testTruncateTo() throws IOException {
+ MemoryRecords createRecords = singletonRecords("test".getBytes(),
mockTime.milliseconds());
+ int setSize = createRecords.sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segments.");
+ assertEquals(msgPerSeg, log.logEndOffset(), "Log end offset should be
equal to number of messages");
+
+ long lastOffset = log.logEndOffset();
+ long size = log.size();
+ log.truncateTo(log.logEndOffset()); // keep the entire log
+ assertEquals(lastOffset, log.logEndOffset(), "Should not change
offset");
+ assertEquals(size, log.size(), "Should not change log size");
+ log.truncateTo(log.logEndOffset() + 1); // try to truncate beyond
lastOffset
+ assertEquals(lastOffset, log.logEndOffset(), "Should not change offset
but should log error");
+ assertEquals(size, log.size(), "Should not change log size");
+ log.truncateTo(msgPerSeg / 2); // truncate somewhere in between
+ assertEquals(msgPerSeg / 2, log.logEndOffset(), "Should change
offset");
+ assertTrue(log.size() < size, "Should change log size");
+ log.truncateTo(0); // truncate the entire log
+ assertEquals(0, log.logEndOffset(), "Should change offset");
+ assertEquals(0, log.size(), "Should change log size");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertEquals(lastOffset, log.logEndOffset(), "Should be back to
original offset");
+ assertEquals(size, log.size(), "Should be back to original size");
+ log.truncateFullyAndStartAt(log.logEndOffset() - (msgPerSeg - 1),
Optional.empty());
+ assertEquals(lastOffset - (msgPerSeg - 1), log.logEndOffset(), "Should
change offset");
+ assertEquals(0, log.size(), "Should change log size");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertTrue(log.logEndOffset() > msgPerSeg, "Should be ahead of to
original offset");
+ assertEquals(size, log.size(), "log size should be same as before");
+ log.truncateTo(0); // truncate before first start offset in the log
+ assertEquals(0, log.logEndOffset(), "Should change offset");
+ assertEquals(0, log.size(), "Should change log size");
+ }
+
+ /**
+ * Verify that when we truncate a log the index of the last segment is
resized to the max index size to allow more appends
+ */
+ @Test
+ public void testIndexResizingAtTruncation() throws IOException {
+ int setSize = singletonRecords("test".getBytes(),
mockTime.milliseconds()).sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentSize)
+ .indexIntervalBytes(setSize - 1)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ mockTime.sleep(msgPerSeg);
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(2, log.numberOfSegments(), "There should be exactly 2
segment.");
+ int expectedEntries = msgPerSeg - 1;
+
+ List<LogSegment> segments = new ArrayList<>(log.logSegments());
+ assertEquals(expectedEntries,
segments.get(0).offsetIndex().maxEntries(),
+ "The index of the first segment should have " + expectedEntries +
" entries");
+ assertEquals(expectedEntries, segments.get(0).timeIndex().maxEntries(),
+ "The time index of the first segment should have " +
expectedEntries + " entries");
+
+ log.truncateTo(0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+ assertEquals(log.config().maxIndexSize / 8, new
ArrayList<>(log.logSegments()).get(0).offsetIndex().maxEntries(),
+ "The index of segment 1 should be resized to maxIndexSize");
+ assertEquals(log.config().maxIndexSize / 12, new
ArrayList<>(log.logSegments()).get(0).timeIndex().maxEntries(),
+ "The time index of segment 1 should be resized to maxIndexSize");
+
+ mockTime.sleep(msgPerSeg);
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+ }
+
+ /**
+ * Test that deleted files are deleted after the appropriate time.
+ */
+ @Test
+ public void testAsyncDelete() throws IOException {
+ MemoryRecords createRecords = singletonRecords("test".getBytes(),
mockTime.milliseconds() - 1000L);
+ int asyncDeleteMs = 1000;
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(createRecords.sizeInBytes() * 5)
+ .segmentIndexBytes(1000)
+ .indexIntervalBytes(10000)
+ .retentionMs(999)
+ .fileDeleteDelayMs(asyncDeleteMs)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 100; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() - 1000L), 0);
+
+ // files should be renamed
+ List<LogSegment> segments = new ArrayList<>(log.logSegments());
+ List<File> oldFiles = new ArrayList<>();
+ for (LogSegment segment : segments) {
+ oldFiles.add(segment.log().file());
+ oldFiles.add(segment.offsetIndexFile());
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ assertEquals(1, log.numberOfSegments(), "Only one segment should
remain.");
+ assertTrue(segments.stream().allMatch(s ->
s.log().file().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) &&
+ segments.stream().allMatch(s ->
s.offsetIndexFile().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
+ "All log and index files should end in .deleted");
+ assertTrue(segments.stream().allMatch(s -> s.log().file().exists()) &&
+ segments.stream().allMatch(s -> s.offsetIndexFile().exists()),
+ "The .deleted files should still be there.");
+ assertTrue(oldFiles.stream().noneMatch(File::exists), "The original
file should be gone.");
+
+ // when enough time passes the files should be deleted
+ List<File> deletedFiles = new ArrayList<>();
+ for (LogSegment segment : segments) {
+ deletedFiles.add(segment.log().file());
+ deletedFiles.add(segment.offsetIndexFile());
+ }
+ mockTime.sleep(asyncDeleteMs + 1);
+ assertTrue(deletedFiles.stream().noneMatch(File::exists), "Files
should all be gone.");
+ }
+
+ @Test
+ public void testAppendMessageWithNullPayload() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(null, null)), 0);
+ Record head = LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next();
+ assertEquals(0, head.offset());
+ assertFalse(head.hasValue(), "Message payload should be null.");
+ }
+
+ @Test
+ public void testAppendWithOutOfOrderOffsetsThrowsException() throws
IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+ int epoch = 0;
+ long[] appendOffsets = {0L, 1L, 3L, 2L, 4L};
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+ for (long offset : appendOffsets) {
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, offset,
mockTime.milliseconds(), 1L, (short) 0, 0, false, epoch);
+ builder.append(new SimpleRecord("key".getBytes(),
"value".getBytes()));
+ builder.close();
+ }
+ buffer.flip();
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+ assertThrows(OffsetsOutOfOrderException.class, () ->
log.appendAsFollower(memoryRecords, epoch));
+ }
+
+ private static Stream<Arguments> magicAndCompressionTypes() {
+ return Stream.of(
+ RecordBatch.MAGIC_VALUE_V0,
+ RecordBatch.MAGIC_VALUE_V1,
+ RecordBatch.MAGIC_VALUE_V2
+ ).flatMap(magic -> Stream.of(CompressionType.NONE, CompressionType.LZ4)
+ .map(compressionType -> Arguments.of(magic, compressionType)));
+ }
+
+ @ParameterizedTest(name = "magic={0}, compressionType={1}")
+ @MethodSource("magicAndCompressionTypes")
+ public void testAppendBelowExpectedOffsetThrowsException(byte magic,
CompressionType compressionType) throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ for (int id = 0; id < 2; id++)
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(Integer.toString(id).getBytes())), 0);
+
+ Compression compression = Compression.of(compressionType).build();
+ MemoryRecords invalidRecord = MemoryRecords.withRecords(magic,
compression,
+ new SimpleRecord(Integer.toString(1).getBytes()));
+ assertThrows(
+ UnexpectedAppendOffsetException.class,
+ () -> log.appendAsFollower(invalidRecord, Integer.MAX_VALUE)
+ );
+ }
+
+ @ParameterizedTest(name = "magic={0}, compressionType={1}")
+ @MethodSource("magicAndCompressionTypes")
+ public void testAppendEmptyLogBelowLogStartOffsetThrowsException(byte
magic, CompressionType compressionType) throws IOException {
+ createEmptyLogs(logDir, 7);
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ assertEquals(7L, log.logStartOffset());
+ assertEquals(7L, log.logEndOffset());
+
+ long firstOffset = 4L;
+ MemoryRecords batch = LogTestUtils.records(
+ List.of(new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes()),
+ new SimpleRecord("k3".getBytes(), "v3".getBytes())),
+ magic, Compression.of(compressionType).build(),
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
+ firstOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ UnexpectedAppendOffsetException exception = assertThrows(
+ UnexpectedAppendOffsetException.class,
+ () -> log.appendAsFollower(batch, Integer.MAX_VALUE)
+ );
+ assertEquals(firstOffset, exception.firstOffset,
+ "UnexpectedAppendOffsetException#firstOffset");
+ assertEquals(firstOffset + 2, exception.lastOffset,
+ "UnexpectedAppendOffsetException#lastOffset");
+ }
+
+ @Test
+ public void testAppendWithNoTimestamp() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes(),
"value".getBytes())), 0);
+ }
+
+ @Test
+ public void testAppendToOrReadFromLogInFailedLogDir() throws IOException {
+ long pid = 1L;
+ short epoch = 0;
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0);
+ assertEquals(0, LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next().offset());
+ Consumer<Integer> append =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+ append.accept(10);
+ // Kind of a hack, but renaming the index to a directory ensures that
the append
+ // to the index will fail.
+ log.activeSegment().txnIndex().renameTo(log.dir());
+ assertThrows(KafkaStorageException.class, () ->
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 1, 0,
TransactionVersion.TV_0.featureLevel()));
+ assertThrows(KafkaStorageException.class, () ->
log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0));
+ assertThrows(KafkaStorageException.class, () ->
LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next().offset());
+ }
+
+ @Test
+ public void testWriteLeaderEpochCheckpointAfterDirectoryRename() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.latestEpoch());
+
+ // Ensure that after a directory rename, the epoch cache is written to
the right location
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 10);
+ assertEquals(Optional.of(10), log.latestEpoch());
+ assertTrue(LeaderEpochCheckpointFile.newFile(log.dir()).exists());
+ assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists());
+ }
+
+ @Test
+ public void testTopicIdTransfersAfterDirectoryRename() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
+ Uuid topicId = Uuid.randomUuid();
+ log.assignTopicId(topicId);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.latestEpoch());
+
+ // Ensure that after a directory rename, the partition metadata file
is written to the right location.
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 10);
+ assertEquals(Optional.of(10), log.latestEpoch());
+ assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+ assertFalse(PartitionMetadataFile.newFile(this.logDir).exists());
+
+ // Check the topic ID remains in memory and was copied correctly.
+ assertTrue(log.topicId().isPresent());
+ assertEquals(topicId, log.topicId().get());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testTopicIdFlushesBeforeDirectoryRename() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
+ Uuid topicId = Uuid.randomUuid();
+ log.partitionMetadataFile().get().record(topicId);
+
+ // Ensure that after a directory rename, the partition metadata file
is written to the right location.
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+ assertFalse(PartitionMetadataFile.newFile(this.logDir).exists());
+
+ // Check the file holds the correct contents.
+ assertTrue(log.partitionMetadataFile().get().exists());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.leaderEpochCache().latestEpoch());
+
+ log.appendAsFollower(
+ LogTestUtils.records(List.of(new SimpleRecord("foo".getBytes())),
RecordVersion.V1.value, 1L),
+ 5
+ );
+ assertEquals(Optional.empty(), log.leaderEpochCache().latestEpoch());
+ }
+
+ @Test
+ public void testLeaderEpochCacheCreatedAfterMessageFormatUpgrade() throws
IOException {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000");
+ logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+ logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536");
+ LogConfig logConfig = new LogConfig(logProps);
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeaderWithRecordVersion(LogTestUtils.records(List.of(new
SimpleRecord("bar".getBytes())),
+ RecordBatch.MAGIC_VALUE_V1, 0L), 5, RecordVersion.V1);
+ assertTrue(log.latestEpoch().isEmpty());
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(5, log.latestEpoch().get());
+ }
+
+ @Test
+ public void testSplitOnOffsetOverflow() throws IOException {
+ // create a log such that one log segment has offsets that overflow,
and call the split API on that segment
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .indexIntervalBytes(1)
+ .fileDeleteDelayMs(1000)
+ .build();
+ LogSegment segmentWithOverflow = createLogWithOverflow(logConfig);
+ assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment
must have offset overflow");
+
+ List<Record> allRecordsBeforeSplit = allRecords(log);
+
+ // split the segment with overflow
+ log.splitOverflowedSegment(segmentWithOverflow);
+
+ // assert we were successfully able to split the segment
+ assertEquals(4, log.numberOfSegments());
+ verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+ // verify we do not have offset overflow anymore
+ assertFalse(LogTestUtils.hasOffsetOverflow(log));
+ }
+
+ @Test
+ public void testDegenerateSegmentSplit() throws IOException {
+ // This tests a scenario where all of the batches appended to a
segment have overflowed.
+ // When we split the overflowed segment, only one new segment will be
created.
+
+ long overflowOffset = (long) Integer.MAX_VALUE + 1;
+ MemoryRecords batch1 = MemoryRecords.withRecords(overflowOffset,
Compression.NONE, 0,
+ new SimpleRecord("a".getBytes()));
+ MemoryRecords batch2 = MemoryRecords.withRecords(overflowOffset + 1,
Compression.NONE, 0,
+ new SimpleRecord("b".getBytes()));
+
+ testDegenerateSplitSegmentWithOverflow(0L, List.of(batch1, batch2));
+ }
+
+ @Test
+ public void testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset()
throws IOException {
+ // Degenerate case where the only batch in the segment overflows. In
this scenario,
+ // the first offset of the batch is valid, but the last overflows.
+
+ long firstBatchBaseOffset = (long) Integer.MAX_VALUE - 1;
+ MemoryRecords records =
MemoryRecords.withRecords(firstBatchBaseOffset, Compression.NONE, 0,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes()),
+ new SimpleRecord("c".getBytes()));
+
+ testDegenerateSplitSegmentWithOverflow(0L, List.of(records));
+ }
+
+ @Test
+ public void testReadCommittedWithConcurrentHighWatermarkUpdates() throws
Exception {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024 * 1024 * 5)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long lastOffset = 50L;
+
+ short producerEpoch = 0;
+ long producerId = 15L;
+ Consumer<Integer> appendProducer =
LogTestUtils.appendTransactionalAsLeader(log, producerId, producerEpoch,
mockTime);
+
+ // Thread 1 writes single-record transactions and attempts to read them
+ // before they have been aborted, and then aborts them
+ Callable<Integer> txnWriteAndReadLoop = () -> {
+ int nonEmptyReads = 0;
+ while (log.logEndOffset() < lastOffset) {
+ long currentLogEndOffset = log.logEndOffset();
+
+ appendProducer.accept(1);
+
+ FetchDataInfo readInfo = log.read(currentLogEndOffset,
Integer.MAX_VALUE, FetchIsolation.TXN_COMMITTED, false);
+
+ if (readInfo.records.sizeInBytes() > 0)
+ nonEmptyReads += 1;
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId,
producerEpoch, ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+ }
+ return nonEmptyReads;
+ };
+
+ // Thread 2 watches the log and updates the high watermark
+ Runnable hwUpdateLoop = () -> assertDoesNotThrow(() -> {
+ while (log.logEndOffset() < lastOffset) {
Review Comment:
I think using a busy loop increases the likelihood of triggering race
conditions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]