mimaison commented on code in PR #19762: URL: https://github.com/apache/kafka/pull/19762#discussion_r2182458792
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftLogTest.java: ########## @@ -0,0 +1,1190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.record.ArbitraryMemoryRecords; +import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.internals.BatchBuilder; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.test.TestUtils; + +import net.jqwik.api.AfterFailureMode; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaRaftLogTest { + + private static final MetadataLogConfig DEFAULT_METADATA_LOG_CONFIG = createMetadataLogConfig( + 100 * 1024, + 10 * 1000, + 100 * 1024, + 60 * 1000, + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + KafkaRaftClient.MAX_FETCH_SIZE_BYTES + ); + + private final MockTime mockTime = new MockTime(); + private File tempDir; + + @BeforeEach + public void setUp() { + tempDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tempDir); + } + + @Test + public void testConfig() throws IOException { + Properties props = new Properties(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093"); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(2)); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10240)); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, String.valueOf(10 * 1024)); + assertThrows(ConfigException.class, () -> { + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + }); + + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + } + + @Test + public void testUnexpectedAppendOffset() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); + int currentEpoch = 3; + long initialOffset = log.endOffset().offset(); + + log.appendAsLeader( + MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), + currentEpoch + ); + + // Throw exception for out of order records + assertThrows( + RuntimeException.class, + () -> log.appendAsLeader(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + + assertThrows( + RuntimeException.class, + () -> log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + } + + @Test + public void testEmptyAppendNotAllowed() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + assertThrows(IllegalArgumentException.class, () -> log.appendAsFollower(MemoryRecords.EMPTY, 1)); + assertThrows(IllegalArgumentException.class, () -> log.appendAsLeader(MemoryRecords.EMPTY, 1)); + } + + @ParameterizedTest + @ArgumentsSource(InvalidMemoryRecordsProvider.class) + public void testInvalidMemoryRecords(MemoryRecords records, Optional<Class<Exception>> expectedException) throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + Executable action = () -> log.appendAsFollower(records, Integer.MAX_VALUE); + if (expectedException.isPresent()) { + assertThrows(expectedException.get(), action); + } else { + assertThrows(CorruptRecordException.class, action); + } + + assertEquals(previousEndOffset, log.endOffset().offset()); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + public void testRandomRecords(@ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records) throws IOException { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + Utils.delete(tempDir); + } + } + + @Test + public void testInvalidLeaderEpoch() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + int epoch = log.lastFetchedEpoch() + 1; + int numberOfRecords = 10; + + SimpleRecord[] simpleRecords = new SimpleRecord[numberOfRecords]; + for (int i = 0; i < numberOfRecords; i++) { + simpleRecords[i] = new SimpleRecord(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + MemoryRecords batchWithValidEpoch = MemoryRecords.withRecords( + previousEndOffset, + Compression.NONE, + epoch, + simpleRecords + ); + MemoryRecords batchWithInvalidEpoch = MemoryRecords.withRecords( + previousEndOffset + numberOfRecords, + Compression.NONE, + epoch + 1, + simpleRecords + ); + + ByteBuffer buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + batchWithInvalidEpoch.sizeInBytes()); + buffer.put(batchWithValidEpoch.buffer()); + buffer.put(batchWithInvalidEpoch.buffer()); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + log.appendAsFollower(records, epoch); + + // Check that only the first batch was appended + assertEquals(previousEndOffset + numberOfRecords, log.endOffset().offset()); + // Check that the last fetched epoch matches the first batch + assertEquals(epoch, log.lastFetchedEpoch()); + } + + @Test + public void testCreateSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()); + } + + @Test + public void testCreateSnapshotFromEndOffset() throws IOException { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, firstEpoch); + append(log, numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test finding the first epoch + log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close(); + + // Test finding the second epoch + log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close(); + } + + @Test + public void testCreateSnapshotInMiddleOfBatch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotMuchLaterEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) + ); + } + + @Test + public void testHighWatermarkOffsetMetadata() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + LogOffsetMetadata highWatermarkMetadata = log.highWatermark(); + assertEquals(numberOfRecords, highWatermarkMetadata.offset()); + assertTrue(highWatermarkMetadata.metadata().isPresent()); + + SegmentPosition segmentPosition = (SegmentPosition) highWatermarkMetadata.metadata().get(); + assertEquals(0, segmentPosition.baseOffset()); + assertTrue(segmentPosition.relativePosition() > 0); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords - 4, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + for (int i = 1; i <= numberOfRecords; i++) { + append(log, 1, epoch); + } + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + // Simulate log cleanup that advances the LSO + log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion); + + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch()))); Review Comment: We expect `createNewSnapshot()` to return `Optional.empty()`, so there should be nothing to close. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftLogTest.java: ########## @@ -0,0 +1,1190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.record.ArbitraryMemoryRecords; +import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.internals.BatchBuilder; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.test.TestUtils; + +import net.jqwik.api.AfterFailureMode; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaRaftLogTest { + + private static final MetadataLogConfig DEFAULT_METADATA_LOG_CONFIG = createMetadataLogConfig( + 100 * 1024, + 10 * 1000, + 100 * 1024, + 60 * 1000, + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + KafkaRaftClient.MAX_FETCH_SIZE_BYTES + ); + + private final MockTime mockTime = new MockTime(); + private File tempDir; + + @BeforeEach + public void setUp() { + tempDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tempDir); + } + + @Test + public void testConfig() throws IOException { + Properties props = new Properties(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093"); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(2)); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10240)); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, String.valueOf(10 * 1024)); + assertThrows(ConfigException.class, () -> { + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + }); + + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + } + + @Test + public void testUnexpectedAppendOffset() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); + int currentEpoch = 3; + long initialOffset = log.endOffset().offset(); + + log.appendAsLeader( + MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), + currentEpoch + ); + + // Throw exception for out of order records + assertThrows( + RuntimeException.class, + () -> log.appendAsLeader(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + + assertThrows( + RuntimeException.class, + () -> log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + } + + @Test + public void testEmptyAppendNotAllowed() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + assertThrows(IllegalArgumentException.class, () -> log.appendAsFollower(MemoryRecords.EMPTY, 1)); + assertThrows(IllegalArgumentException.class, () -> log.appendAsLeader(MemoryRecords.EMPTY, 1)); + } + + @ParameterizedTest + @ArgumentsSource(InvalidMemoryRecordsProvider.class) + public void testInvalidMemoryRecords(MemoryRecords records, Optional<Class<Exception>> expectedException) throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + Executable action = () -> log.appendAsFollower(records, Integer.MAX_VALUE); + if (expectedException.isPresent()) { + assertThrows(expectedException.get(), action); + } else { + assertThrows(CorruptRecordException.class, action); + } + + assertEquals(previousEndOffset, log.endOffset().offset()); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + public void testRandomRecords(@ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records) throws IOException { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + Utils.delete(tempDir); + } + } + + @Test + public void testInvalidLeaderEpoch() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + int epoch = log.lastFetchedEpoch() + 1; + int numberOfRecords = 10; + + SimpleRecord[] simpleRecords = new SimpleRecord[numberOfRecords]; + for (int i = 0; i < numberOfRecords; i++) { + simpleRecords[i] = new SimpleRecord(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + MemoryRecords batchWithValidEpoch = MemoryRecords.withRecords( + previousEndOffset, + Compression.NONE, + epoch, + simpleRecords + ); + MemoryRecords batchWithInvalidEpoch = MemoryRecords.withRecords( + previousEndOffset + numberOfRecords, + Compression.NONE, + epoch + 1, + simpleRecords + ); + + ByteBuffer buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + batchWithInvalidEpoch.sizeInBytes()); + buffer.put(batchWithValidEpoch.buffer()); + buffer.put(batchWithInvalidEpoch.buffer()); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + log.appendAsFollower(records, epoch); + + // Check that only the first batch was appended + assertEquals(previousEndOffset + numberOfRecords, log.endOffset().offset()); + // Check that the last fetched epoch matches the first batch + assertEquals(epoch, log.lastFetchedEpoch()); + } + + @Test + public void testCreateSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()); + } + + @Test + public void testCreateSnapshotFromEndOffset() throws IOException { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, firstEpoch); + append(log, numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test finding the first epoch + log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close(); + + // Test finding the second epoch + log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close(); + } + + @Test + public void testCreateSnapshotInMiddleOfBatch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotMuchLaterEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) + ); + } + + @Test + public void testHighWatermarkOffsetMetadata() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + LogOffsetMetadata highWatermarkMetadata = log.highWatermark(); + assertEquals(numberOfRecords, highWatermarkMetadata.offset()); + assertTrue(highWatermarkMetadata.metadata().isPresent()); + + SegmentPosition segmentPosition = (SegmentPosition) highWatermarkMetadata.metadata().get(); + assertEquals(0, segmentPosition.baseOffset()); + assertTrue(segmentPosition.relativePosition() > 0); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords - 4, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + for (int i = 1; i <= numberOfRecords; i++) { + append(log, 1, epoch); + } + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + // Simulate log cleanup that advances the LSO + log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion); + + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch()))); + } + + @Test + public void testCreateSnapshotDivergingEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotOlderEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotWithMissingEpoch() throws IOException { + int firstBatchRecords = 5; + int firstEpoch = 1; + int missingEpoch = firstEpoch + 1; + int secondBatchRecords = 5; + int secondEpoch = missingEpoch + 1; + + int numberOfRecords = firstBatchRecords + secondBatchRecords; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, firstBatchRecords, firstEpoch); + append(log, secondBatchRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(1, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(firstBatchRecords, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(secondBatchRecords, missingEpoch)) + ); + } + + @Test + public void testCreateExistingSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId), Review Comment: We expect createNewSnapshot() to return Optional.empty(), so there should be nothing to close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org