chia7712 commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2484861108


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -16,22 +16,719 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class UnifiedLogTest {
 
     private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
+    private final MockTime mockTime = new MockTime();
+    private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
+    private final ProducerStateManagerConfig producerStateManagerConfig = new 
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
+    private final List<UnifiedLog> logsToClose = new ArrayList<>();
+
+    private UnifiedLog log;
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        brokerTopicStats.close();
+        for (UnifiedLog log : logsToClose) {
+            log.close();
+        }
+        Utils.delete(tmpDir);
+    }
 
     @Test
     public void testOffsetFromProducerSnapshotFile() {
         long offset = 23423423L;
         File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
         assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
     }
+
+    @Test
+    public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException 
{
+        SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+            .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+            .toArray(SimpleRecord[]::new);
+
+        // Given this partition is on leader epoch 72
+        int epoch = 72;
+        try (UnifiedLog log = createLog(logDir, new LogConfig(new 
Properties()))) {
+            log.assignEpochStartOffset(epoch, records.length);
+
+            // When appending messages as a leader (i.e. assignOffsets = true)
+            for (SimpleRecord record : records) {
+                log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
record), epoch);
+            }
+
+            // Then leader epoch should be set on messages
+            for (int i = 0; i < records.length; i++) {
+                FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END, 
true);
+                RecordBatch batch = read.records.batches().iterator().next();
+                assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have 
set leader epoch");
+            }
+        }
+    }
+
+    @Test
+    public void 
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() 
throws IOException {
+        int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+        SimpleRecord[] records = Arrays.stream(messageIds)
+            .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+            .toArray(SimpleRecord[]::new);
+
+        //Given each message has an offset & epoch, as msgs from leader would
+        Function<Integer, MemoryRecords> recordsForEpoch = i -> {
+            MemoryRecords recs = MemoryRecords.withRecords(messageIds[i], 
Compression.NONE, records[i]);
+            recs.batches().forEach(record -> {
+                record.setPartitionLeaderEpoch(42);
+                record.setLastOffset(i);
+            });
+            return recs;
+        };
+
+        try (UnifiedLog log = createLog(logDir, new LogConfig(new 
Properties()))) {
+            // Given each message has an offset & epoch, as msgs from leader 
would
+            for (int i = 0; i < records.length; i++) {
+                log.appendAsFollower(recordsForEpoch.apply(i), i);
+            }
+
+            assertEquals(Optional.of(42), log.latestEpoch());
+        }
+    }
+
+    @Test
+    public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws 
IOException {
+        Supplier<MemoryRecords>  records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        // Given three segments of 5 messages each
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Given epochs
+        cache.assign(0, 0);
+        cache.assign(1, 5);
+        cache.assign(2, 10);
+
+        // When first segment is removed
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+
+        //The oldest epoch entry should have been removed
+        assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), 
cache.epochEntries());
+    }
+
+    @Test
+    public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        // Given three segments of 5 messages each
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Given epochs
+        cache.assign(0, 0);
+        cache.assign(1, 7);
+        cache.assign(2, 10);
+
+        // When first segment removed (up to offset 5)
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+
+        //The first entry should have gone from (0,0) => (0,5)
+        assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new 
EpochEntry(2, 10)), cache.epochEntries());
+    }
+
+    @Test
+    public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> records(List.of(new 
SimpleRecord("value".getBytes())), 0, 0);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(10 * records.get().sizeInBytes())
+                .build();
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        //Given 2 segments, 10 messages per segment
+        append(0, 0, 10);
+        append(1, 10, 6);
+        append(2, 16, 4);
+
+        assertEquals(2, log.numberOfSegments());
+        assertEquals(20, log.logEndOffset());
+
+        // When truncate to LEO (no op)
+        log.truncateTo(log.logEndOffset());
+        // Then no change
+        assertEquals(3, cache.epochEntries().size());
+
+        // When truncate
+        log.truncateTo(11);
+        // Then no change
+        assertEquals(2, cache.epochEntries().size());
+
+        // When truncate
+        log.truncateTo(10);
+        assertEquals(1, cache.epochEntries().size());
+
+        // When truncate all
+        log.truncateTo(0);
+        assertEquals(0, cache.epochEntries().size());
+    }
+
+    @Test
+    public void shouldDeleteSizeBasedSegments() throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+        assertEquals(2, log.numberOfSegments(), "should have 2 segments");
+    }
+
+    @Test
+    public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionBytes(records.get().sizeInBytes() * 15L)
+                .build();
+
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+        assertEquals(3, log.numberOfSegments(), "should have 3 segments");
+    }
+
+    @Test
+    public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 15)
+                .withRetentionMs(10000L)
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+        assertEquals(1, log.numberOfSegments(), "There should be 1 segment 
remaining");
+    }
+
+    @Test
+    public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), mockTime.milliseconds());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionMs(10000000)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+        assertEquals(3, log.numberOfSegments(), "There should be 3 segments 
remaining");
+    }
+
+    @Test
+    public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionMs(10000)
+                .withCleanupPolicy("compact")
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // mark the oldest segment as older the retention.ms
+        
log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 
20000);
+
+        int segments = log.numberOfSegments();
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+        assertEquals(segments, log.numberOfSegments(), "There should be 3 
segments remaining");
+    }
+
+    @Test
+    public void 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .withSegmentBytes(records.get().sizeInBytes() * 5)
+                .withRetentionBytes(records.get().sizeInBytes() * 10L)
+                .withCleanupPolicy("compact, delete")
+                .build();
+
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();

Review Comment:
   could you add assert to ensure it does delete some segments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to