TaiJuWu commented on code in PR #21761:
URL: https://github.com/apache/kafka/pull/21761#discussion_r2960109793


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -3407,6 +3422,887 @@ public void 
testFetchLatestTieredTimestampWithRemoteStorage() throws Exception {
                 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
     }
 
+    @Test
+    public void testFetchEarliestPendingUploadTimestampNoRemoteStorage() 
throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Test initial state before any records
+        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(), 
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+        // Append records
+        prepareLogWithSequentialRecords(log, 2);
+
+        // Test state after records are appended
+        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void testFetchEarliestPendingUploadTimestampWithRemoteStorage() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            if (timestamp == firstTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+            }
+            return Optional.empty();
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offset 0 (first timestamp) is in remote storage and deleted 
locally. Offset 1 (second timestamp) is in local storage.
+        log.updateLocalLogStartOffset(1);
+        log.updateHighestOffsetInRemoteStorage(0);
+
+        // In the assertions below we test that offset 0 (first timestamp) is 
only in remote and offset 1 (second timestamp) is in local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void 
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // Offsets upto 1 are in remote storage
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            if (timestamp == firstTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+            } else if (timestamp == secondTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 1L, Optional.of(secondLeaderEpoch)));
+            }
+            return Optional.empty();
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offsets 0, 1 (first and second timestamps) are in remote storage 
and not deleted locally.
+        log.updateLocalLogStartOffset(0);
+        log.updateHighestOffsetInRemoteStorage(1);
+
+        // In the assertions below we test that offset 0 (first timestamp) and 
offset 1 (second timestamp) are on both remote and local storage
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void testFetchEarliestPendingUploadTimestampNoSegmentsUploaded() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // No offsets are in remote storage
+        doAnswer(ans -> 
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offsets 0, 1, 2 (first, second and third timestamps) are in local 
storage only and not uploaded to remote storage.
+        log.updateLocalLogStartOffset(0);
+        log.updateHighestOffsetInRemoteStorage(-1);
+
+        // In the assertions below we test that offset 0 (first timestamp), 
offset 1 (second timestamp) and offset 2 (third timestamp) are only on the 
local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, 
Optional.of(-1)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void 
testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote() throws 
Exception {
+        int logStartOffset = 100;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // Offsets 100, 101, 102 (first, second and third timestamps) are in 
local storage and not uploaded to remote storage.
+        // Tiered storage copy was disabled and then enabled again, because of 
which the remote log segments are deleted but
+        // the highest offset in remote storage has become stale
+        doAnswer(ans -> 
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        log.updateLocalLogStartOffset(100);
+        log.updateHighestOffsetInRemoteStorage(50);
+
+        // In the assertions below we test that offset 100 (first timestamp), 
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the 
local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 
100L, Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
101L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L, 
Optional.empty()),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    /**
+     * Test the Log truncate operations
+     */
+    @Test
+    public void testTruncateTo() throws IOException {
+        MemoryRecords createRecords = singletonRecords("test".getBytes(), 
mockTime.milliseconds());
+        int setSize = createRecords.sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * setSize; // each segment will be 10 
messages
+
+        // create a log
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segments.");
+        assertEquals(msgPerSeg, log.logEndOffset(), "Log end offset should be 
equal to number of messages");
+
+        long lastOffset = log.logEndOffset();
+        long size = log.size();
+        log.truncateTo(log.logEndOffset()); // keep the entire log
+        assertEquals(lastOffset, log.logEndOffset(), "Should not change 
offset");
+        assertEquals(size, log.size(), "Should not change log size");
+        log.truncateTo(log.logEndOffset() + 1); // try to truncate beyond 
lastOffset
+        assertEquals(lastOffset, log.logEndOffset(), "Should not change offset 
but should log error");
+        assertEquals(size, log.size(), "Should not change log size");
+        log.truncateTo(msgPerSeg / 2); // truncate somewhere in between
+        assertEquals(msgPerSeg / 2, log.logEndOffset(), "Should change 
offset");
+        assertTrue(log.size() < size, "Should change log size");
+        log.truncateTo(0); // truncate the entire log
+        assertEquals(0, log.logEndOffset(), "Should change offset");
+        assertEquals(0, log.size(), "Should change log size");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertEquals(lastOffset, log.logEndOffset(), "Should be back to 
original offset");
+        assertEquals(size, log.size(), "Should be back to original size");
+        log.truncateFullyAndStartAt(log.logEndOffset() - (msgPerSeg - 1), 
Optional.empty());
+        assertEquals(lastOffset - (msgPerSeg - 1), log.logEndOffset(), "Should 
change offset");
+        assertEquals(0, log.size(), "Should change log size");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertTrue(log.logEndOffset() > msgPerSeg, "Should be ahead of to 
original offset");
+        assertEquals(size, log.size(), "log size should be same as before");
+        log.truncateTo(0); // truncate before first start offset in the log
+        assertEquals(0, log.logEndOffset(), "Should change offset");
+        assertEquals(0, log.size(), "Should change log size");
+    }
+
+    /**
+     * Verify that when we truncate a log the index of the last segment is 
resized to the max index size to allow more appends
+     */
+    @Test
+    public void testIndexResizingAtTruncation() throws IOException {
+        int setSize = singletonRecords("test".getBytes(), 
mockTime.milliseconds()).sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * setSize; // each segment will be 10 
messages
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentSize)
+                .indexIntervalBytes(setSize - 1)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        mockTime.sleep(msgPerSeg);
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(2, log.numberOfSegments(), "There should be exactly 2 
segment.");
+        int expectedEntries = msgPerSeg - 1;
+
+        List<LogSegment> segments = new ArrayList<>(log.logSegments());
+        assertEquals(expectedEntries, 
segments.get(0).offsetIndex().maxEntries(),
+            "The index of the first segment should have " + expectedEntries + 
" entries");
+        assertEquals(expectedEntries, segments.get(0).timeIndex().maxEntries(),
+            "The time index of the first segment should have " + 
expectedEntries + " entries");
+
+        log.truncateTo(0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+        assertEquals(log.config().maxIndexSize / 8, new 
ArrayList<>(log.logSegments()).get(0).offsetIndex().maxEntries(),
+            "The index of segment 1 should be resized to maxIndexSize");
+        assertEquals(log.config().maxIndexSize / 12, new 
ArrayList<>(log.logSegments()).get(0).timeIndex().maxEntries(),
+            "The time index of segment 1 should be resized to maxIndexSize");
+
+        mockTime.sleep(msgPerSeg);
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+    }
+
+    /**
+     * Test that deleted files are deleted after the appropriate time.
+     */
+    @Test
+    public void testAsyncDelete() throws IOException {
+        MemoryRecords createRecords = singletonRecords("test".getBytes(), 
mockTime.milliseconds() - 1000L);
+        int asyncDeleteMs = 1000;
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(createRecords.sizeInBytes() * 5)
+                .segmentIndexBytes(1000)
+                .indexIntervalBytes(10000)
+                .retentionMs(999)
+                .fileDeleteDelayMs(asyncDeleteMs)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 100; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() - 1000L), 0);
+
+        // files should be renamed
+        List<LogSegment> segments = new ArrayList<>(log.logSegments());
+        List<File> oldFiles = new ArrayList<>();
+        for (LogSegment segment : segments) {
+            oldFiles.add(segment.log().file());
+            oldFiles.add(segment.offsetIndexFile());
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        assertEquals(1, log.numberOfSegments(), "Only one segment should 
remain.");
+        assertTrue(segments.stream().allMatch(s -> 
s.log().file().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) &&
+            segments.stream().allMatch(s -> 
s.offsetIndexFile().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
+            "All log and index files should end in .deleted");
+        assertTrue(segments.stream().allMatch(s -> s.log().file().exists()) &&
+            segments.stream().allMatch(s -> s.offsetIndexFile().exists()),
+            "The .deleted files should still be there.");
+        assertTrue(oldFiles.stream().noneMatch(File::exists), "The original 
file should be gone.");
+
+        // when enough time passes the files should be deleted
+        List<File> deletedFiles = new ArrayList<>();
+        for (LogSegment segment : segments) {
+            deletedFiles.add(segment.log().file());
+            deletedFiles.add(segment.offsetIndexFile());
+        }
+        mockTime.sleep(asyncDeleteMs + 1);
+        assertTrue(deletedFiles.stream().noneMatch(File::exists), "Files 
should all be gone.");
+    }
+
+    @Test
+    public void testAppendMessageWithNullPayload() throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(null, null)), 0);
+        Record head = LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next();
+        assertEquals(0, head.offset());
+        assertFalse(head.hasValue(), "Message payload should be null.");
+    }
+
+    @Test
+    public void testAppendWithOutOfOrderOffsetsThrowsException() throws 
IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+        int epoch = 0;
+        long[] appendOffsets = {0L, 1L, 3L, 2L, 4L};
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        for (long offset : appendOffsets) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, offset, 
mockTime.milliseconds(), 1L, (short) 0, 0, false, epoch);
+            builder.append(new SimpleRecord("key".getBytes(), 
"value".getBytes()));
+            builder.close();
+        }
+        buffer.flip();
+        MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(OffsetsOutOfOrderException.class, () -> 
log.appendAsFollower(memoryRecords, epoch));
+    }
+
+    private static Stream<Arguments> magicAndCompressionTypes() {
+        return Stream.of(
+            RecordBatch.MAGIC_VALUE_V0,
+            RecordBatch.MAGIC_VALUE_V1,
+            RecordBatch.MAGIC_VALUE_V2
+        ).flatMap(magic -> Stream.of(CompressionType.NONE, CompressionType.LZ4)
+            .map(compressionType -> Arguments.of(magic, compressionType)));
+    }
+
+    @ParameterizedTest(name = "magic={0}, compressionType={1}")
+    @MethodSource("magicAndCompressionTypes")
+    public void testAppendBelowExpectedOffsetThrowsException(byte magic, 
CompressionType compressionType) throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        for (int id = 0; id < 2; id++)
+            log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(Integer.toString(id).getBytes())), 0);
+
+        Compression compression = Compression.of(compressionType).build();
+        MemoryRecords invalidRecord = MemoryRecords.withRecords(magic, 
compression,
+            new SimpleRecord(Integer.toString(1).getBytes()));
+        assertThrows(
+            UnexpectedAppendOffsetException.class,
+            () -> log.appendAsFollower(invalidRecord, Integer.MAX_VALUE)
+        );
+    }
+
+    @ParameterizedTest(name = "magic={0}, compressionType={1}")
+    @MethodSource("magicAndCompressionTypes")
+    public void testAppendEmptyLogBelowLogStartOffsetThrowsException(byte 
magic, CompressionType compressionType) throws IOException {
+        createEmptyLogs(logDir, 7);
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        assertEquals(7L, log.logStartOffset());
+        assertEquals(7L, log.logEndOffset());
+
+        long firstOffset = 4L;
+        MemoryRecords batch = LogTestUtils.records(
+            List.of(new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                    new SimpleRecord("k2".getBytes(), "v2".getBytes()),
+                    new SimpleRecord("k3".getBytes(), "v3".getBytes())),
+            magic, Compression.of(compressionType).build(),
+            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+            firstOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+        UnexpectedAppendOffsetException exception = assertThrows(
+            UnexpectedAppendOffsetException.class,
+            () -> log.appendAsFollower(batch, Integer.MAX_VALUE)
+        );
+        assertEquals(firstOffset, exception.firstOffset,
+            "UnexpectedAppendOffsetException#firstOffset");
+        assertEquals(firstOffset + 2, exception.lastOffset,
+            "UnexpectedAppendOffsetException#lastOffset");
+    }
+
+    @Test
+    public void testAppendWithNoTimestamp() throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+            new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes(), 
"value".getBytes())), 0);
+    }
+
+    @Test
+    public void testAppendToOrReadFromLogInFailedLogDir() throws IOException {
+        long pid = 1L;
+        short epoch = 0;
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0);
+        assertEquals(0, LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next().offset());
+        Consumer<Integer> append = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+        append.accept(10);
+        // Kind of a hack, but renaming the index to a directory ensures that 
the append
+        // to the index will fail.
+        log.activeSegment().txnIndex().renameTo(log.dir());
+        assertThrows(KafkaStorageException.class, () -> 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 1, 0, 
TransactionVersion.TV_0.featureLevel()));
+        assertThrows(KafkaStorageException.class, () -> 
log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0));
+        assertThrows(KafkaStorageException.class, () -> 
LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next().offset());
+    }
+
+    @Test
+    public void testWriteLeaderEpochCheckpointAfterDirectoryRename() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.latestEpoch());
+
+        // Ensure that after a directory rename, the epoch cache is written to 
the right location
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 10);
+        assertEquals(Optional.of(10), log.latestEpoch());
+        assertTrue(LeaderEpochCheckpointFile.newFile(log.dir()).exists());
+        assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists());
+    }
+
+    @Test
+    public void testTopicIdTransfersAfterDirectoryRename() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
+        Uuid topicId = Uuid.randomUuid();
+        log.assignTopicId(topicId);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.latestEpoch());
+
+        // Ensure that after a directory rename, the partition metadata file 
is written to the right location.
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 10);
+        assertEquals(Optional.of(10), log.latestEpoch());
+        assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+        assertFalse(PartitionMetadataFile.newFile(this.logDir).exists());
+
+        // Check the topic ID remains in memory and was copied correctly.
+        assertTrue(log.topicId().isPresent());
+        assertEquals(topicId, log.topicId().get());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testTopicIdFlushesBeforeDirectoryRename() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
+        Uuid topicId = Uuid.randomUuid();
+        log.partitionMetadataFile().get().record(topicId);
+
+        // Ensure that after a directory rename, the partition metadata file 
is written to the right location.
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+        assertFalse(PartitionMetadataFile.newFile(this.logDir).exists());
+
+        // Check the file holds the correct contents.
+        assertTrue(log.partitionMetadataFile().get().exists());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages() 
throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.leaderEpochCache().latestEpoch());
+
+        log.appendAsFollower(
+            LogTestUtils.records(List.of(new SimpleRecord("foo".getBytes())), 
RecordVersion.V1.value, 1L),
+            5
+        );
+        assertEquals(Optional.empty(), log.leaderEpochCache().latestEpoch());
+    }
+
+    @Test
+    public void testLeaderEpochCacheCreatedAfterMessageFormatUpgrade() throws 
IOException {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000");
+        logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+        logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536");
+        LogConfig logConfig = new LogConfig(logProps);
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeaderWithRecordVersion(LogTestUtils.records(List.of(new 
SimpleRecord("bar".getBytes())),
+            RecordBatch.MAGIC_VALUE_V1, 0L), 5, RecordVersion.V1);
+        assertTrue(log.latestEpoch().isEmpty());
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(5, log.latestEpoch().get());
+    }
+
+    @Test
+    public void testSplitOnOffsetOverflow() throws IOException {
+        // create a log such that one log segment has offsets that overflow, 
and call the split API on that segment
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .indexIntervalBytes(1)
+                .fileDeleteDelayMs(1000)
+                .build();
+        LogSegment segmentWithOverflow = createLogWithOverflow(logConfig);
+        assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment 
must have offset overflow");
+
+        List<Record> allRecordsBeforeSplit = allRecords(log);
+
+        // split the segment with overflow
+        log.splitOverflowedSegment(segmentWithOverflow);
+
+        // assert we were successfully able to split the segment
+        assertEquals(4, log.numberOfSegments());
+        verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+        // verify we do not have offset overflow anymore
+        assertFalse(LogTestUtils.hasOffsetOverflow(log));
+    }
+
+    @Test
+    public void testDegenerateSegmentSplit() throws IOException {
+        // This tests a scenario where all of the batches appended to a 
segment have overflowed.
+        // When we split the overflowed segment, only one new segment will be 
created.
+
+        long overflowOffset = (long) Integer.MAX_VALUE + 1;
+        MemoryRecords batch1 = MemoryRecords.withRecords(overflowOffset, 
Compression.NONE, 0,
+            new SimpleRecord("a".getBytes()));
+        MemoryRecords batch2 = MemoryRecords.withRecords(overflowOffset + 1, 
Compression.NONE, 0,
+            new SimpleRecord("b".getBytes()));
+
+        testDegenerateSplitSegmentWithOverflow(0L, List.of(batch1, batch2));
+    }
+
+    @Test
+    public void testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset() 
throws IOException {
+        // Degenerate case where the only batch in the segment overflows. In 
this scenario,
+        // the first offset of the batch is valid, but the last overflows.
+
+        long firstBatchBaseOffset = (long) Integer.MAX_VALUE - 1;
+        MemoryRecords records = 
MemoryRecords.withRecords(firstBatchBaseOffset, Compression.NONE, 0,
+            new SimpleRecord("a".getBytes()),
+            new SimpleRecord("b".getBytes()),
+            new SimpleRecord("c".getBytes()));
+
+        testDegenerateSplitSegmentWithOverflow(0L, List.of(records));
+    }
+
+    @Test
+    public void testReadCommittedWithConcurrentHighWatermarkUpdates() throws 
Exception {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024 * 1024 * 5)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long lastOffset = 50L;
+
+        short producerEpoch = 0;
+        long producerId = 15L;
+        Consumer<Integer> appendProducer = 
LogTestUtils.appendTransactionalAsLeader(log, producerId, producerEpoch, 
mockTime);
+
+        // Thread 1 writes single-record transactions and attempts to read them
+        // before they have been aborted, and then aborts them
+        Callable<Integer> txnWriteAndReadLoop = () -> {
+            int nonEmptyReads = 0;
+            while (log.logEndOffset() < lastOffset) {
+                long currentLogEndOffset = log.logEndOffset();
+
+                appendProducer.accept(1);
+
+                FetchDataInfo readInfo = log.read(currentLogEndOffset, 
Integer.MAX_VALUE, FetchIsolation.TXN_COMMITTED, false);
+
+                if (readInfo.records.sizeInBytes() > 0)
+                    nonEmptyReads += 1;
+
+                LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, 
producerEpoch, ControlRecordType.ABORT,
+                    mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+            }
+            return nonEmptyReads;
+        };
+
+        // Thread 2 watches the log and updates the high watermark
+        Runnable hwUpdateLoop = () -> assertDoesNotThrow(() -> {
+            while (log.logEndOffset() < lastOffset) {
+                log.updateHighWatermark(log.logEndOffset());
+            }
+        });
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            executor.submit(hwUpdateLoop);
+
+            Future<Integer> future = executor.submit(txnWriteAndReadLoop);
+            int nonEmptyReads = future.get();
+
+            assertEquals(0, nonEmptyReads);
+        } finally {
+            executor.shutdownNow();

Review Comment:
   add `assertDoesNotThrow(() -> executor.awaitTermination(60, 
TimeUnit.SECONDS));` to ensure clean up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to