chia7712 commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2484861108
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -16,22 +16,719 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnifiedLogTest {
private final File tmpDir = TestUtils.tempDirectory();
+ private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+ private final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(false);
+ private final MockTime mockTime = new MockTime();
+ private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
+ private final ProducerStateManagerConfig producerStateManagerConfig = new
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
+ private final List<UnifiedLog> logsToClose = new ArrayList<>();
+
+ private UnifiedLog log;
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ brokerTopicStats.close();
+ for (UnifiedLog log : logsToClose) {
+ log.close();
+ }
+ Utils.delete(tmpDir);
+ }
@Test
public void testOffsetFromProducerSnapshotFile() {
long offset = 23423423L;
File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
}
+
+ @Test
+ public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException
{
+ SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+ .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // Given this partition is on leader epoch 72
+ int epoch = 72;
+ try (UnifiedLog log = createLog(logDir, new LogConfig(new
Properties()))) {
+ log.assignEpochStartOffset(epoch, records.length);
+
+ // When appending messages as a leader (i.e. assignOffsets = true)
+ for (SimpleRecord record : records) {
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
record), epoch);
+ }
+
+ // Then leader epoch should be set on messages
+ for (int i = 0; i < records.length; i++) {
+ FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END,
true);
+ RecordBatch batch = read.records.batches().iterator().next();
+ assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have
set leader epoch");
+ }
+ }
+ }
+
+ @Test
+ public void
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache()
throws IOException {
+ int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+ SimpleRecord[] records = Arrays.stream(messageIds)
+ .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ //Given each message has an offset & epoch, as msgs from leader would
+ Function<Integer, MemoryRecords> recordsForEpoch = i -> {
+ MemoryRecords recs = MemoryRecords.withRecords(messageIds[i],
Compression.NONE, records[i]);
+ recs.batches().forEach(record -> {
+ record.setPartitionLeaderEpoch(42);
+ record.setLastOffset(i);
+ });
+ return recs;
+ };
+
+ try (UnifiedLog log = createLog(logDir, new LogConfig(new
Properties()))) {
+ // Given each message has an offset & epoch, as msgs from leader
would
+ for (int i = 0; i < records.length; i++) {
+ log.appendAsFollower(recordsForEpoch.apply(i), i);
+ }
+
+ assertEquals(Optional.of(42), log.latestEpoch());
+ }
+ }
+
+ @Test
+ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ // Given three segments of 5 messages each
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Given epochs
+ cache.assign(0, 0);
+ cache.assign(1, 5);
+ cache.assign(2, 10);
+
+ // When first segment is removed
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+
+ //The oldest epoch entry should have been removed
+ assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)),
cache.epochEntries());
+ }
+
+ @Test
+ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ // Given three segments of 5 messages each
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Given epochs
+ cache.assign(0, 0);
+ cache.assign(1, 7);
+ cache.assign(2, 10);
+
+ // When first segment removed (up to offset 5)
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+
+ //The first entry should have gone from (0,0) => (0,5)
+ assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new
EpochEntry(2, 10)), cache.epochEntries());
+ }
+
+ @Test
+ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog()
throws IOException {
+ Supplier<MemoryRecords> records = () -> records(List.of(new
SimpleRecord("value".getBytes())), 0, 0);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(10 * records.get().sizeInBytes())
+ .build();
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ //Given 2 segments, 10 messages per segment
+ append(0, 0, 10);
+ append(1, 10, 6);
+ append(2, 16, 4);
+
+ assertEquals(2, log.numberOfSegments());
+ assertEquals(20, log.logEndOffset());
+
+ // When truncate to LEO (no op)
+ log.truncateTo(log.logEndOffset());
+ // Then no change
+ assertEquals(3, cache.epochEntries().size());
+
+ // When truncate
+ log.truncateTo(11);
+ // Then no change
+ assertEquals(2, cache.epochEntries().size());
+
+ // When truncate
+ log.truncateTo(10);
+ assertEquals(1, cache.epochEntries().size());
+
+ // When truncate all
+ log.truncateTo(0);
+ assertEquals(0, cache.epochEntries().size());
+ }
+
+ @Test
+ public void shouldDeleteSizeBasedSegments() throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+ assertEquals(2, log.numberOfSegments(), "should have 2 segments");
+ }
+
+ @Test
+ public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionBytes(records.get().sizeInBytes() * 15L)
+ .build();
+
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+ assertEquals(3, log.numberOfSegments(), "should have 3 segments");
+ }
+
+ @Test
+ public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 15)
+ .withRetentionMs(10000L)
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+ assertEquals(1, log.numberOfSegments(), "There should be 1 segment
remaining");
+ }
+
+ @Test
+ public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), mockTime.milliseconds());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionMs(10000000)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+ assertEquals(3, log.numberOfSegments(), "There should be 3 segments
remaining");
+ }
+
+ @Test
+ public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionMs(10000)
+ .withCleanupPolicy("compact")
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // mark the oldest segment as older the retention.ms
+
log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() -
20000);
+
+ int segments = log.numberOfSegments();
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+ assertEquals(segments, log.numberOfSegments(), "There should be 3
segments remaining");
+ }
+
+ @Test
+ public void
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .withSegmentBytes(records.get().sizeInBytes() * 5)
+ .withRetentionBytes(records.get().sizeInBytes() * 10L)
+ .withCleanupPolicy("compact, delete")
+ .build();
+
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
Review Comment:
could you add assert to ensure it does delete some segments?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]