junrao commented on code in PR #19762: URL: https://github.com/apache/kafka/pull/19762#discussion_r2181170038
########## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ########## @@ -671,4 +672,54 @@ public void testTopicIdToNameView() { assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID)); assertThrows(UnsupportedOperationException.class, () -> map.put(FOO_UUID, "bar")); } + + @Test + public void testIsStrayReplicaWithEmptyImage() { + TopicsImage image = topicsImage(List.of()); + List<TopicIdPartition> onDisk = List.of(FOO_0, FOO_1, BAR_0, BAR_1, BAZ_0); + assertTrue(onDisk.stream().allMatch(log -> Review Comment: It seems that this hasn't been addressed? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,816 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements RaftLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[RaftLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.internalMaxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new KafkaException(ioe); + } + } + + @Override + public LogAppendInfo appendAsLeader(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + try { + return handleAndConvertLogAppendInfo(log.appendAsLeader((MemoryRecords) records, epoch, AppendOrigin.RAFT_LEADER)); + } catch (IOException ioe) { + throw new KafkaException(ioe); + } + } + + @Override + public LogAppendInfo appendAsFollower(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + return handleAndConvertLogAppendInfo(log.appendAsFollower((MemoryRecords) records, epoch)); + } + + private LogAppendInfo handleAndConvertLogAppendInfo(org.apache.kafka.storage.internals.log.LogAppendInfo appendInfo) { + if (appendInfo.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) { + throw new CorruptRecordException("Append failed unexpectedly " + appendInfo); + } else { + return new LogAppendInfo(appendInfo.firstOffset(), appendInfo.lastOffset()); + } + } + + @Override + public int lastFetchedEpoch() { + Optional<Integer> latestEpoch = log.latestEpoch(); + return latestEpoch.orElseGet(() -> latestSnapshotId().map(snapshotId -> { + long logEndOffset = endOffset().offset(); + long startOffset = startOffset(); + if (snapshotId.offset() == startOffset && snapshotId.offset() == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + return snapshotId.epoch(); + } else { + throw new KafkaException( + "Log doesn't have a last fetch epoch and there is a snapshot (" + snapshotId + "). " + + "Expected the snapshot's end offset to match the log's end offset (" + logEndOffset + + ") and the log start offset (" + startOffset + ")" + ); + } + }).orElse(0)); + } + + @Override + public OffsetAndEpoch endOffsetForEpoch(int epoch) { + Optional<OffsetAndEpoch> endOffsetEpochOpt = log.endOffsetForEpoch(epoch); + Optional<OffsetAndEpoch> earliestSnapshotIdOpt = earliestSnapshotId(); + if (endOffsetEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetEpoch = endOffsetEpochOpt.get(); + if (earliestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch earliestSnapshotId = earliestSnapshotIdOpt.get(); + if (endOffsetEpoch.offset() == earliestSnapshotId.offset() && endOffsetEpoch.epoch() == epoch) { + // The epoch is equal to the smallest epoch on the log. Override the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + return new OffsetAndEpoch(earliestSnapshotId.offset(), earliestSnapshotId.epoch()); + } + } + return new OffsetAndEpoch(endOffsetEpoch.offset(), endOffsetEpoch.epoch()); + } else { + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); + } + } + + @Override + public LogOffsetMetadata endOffset() { + org.apache.kafka.storage.internals.log.LogOffsetMetadata endOffsetMetadata = log.logEndOffsetMetadata(); + return new LogOffsetMetadata( + endOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + endOffsetMetadata.segmentBaseOffset, + endOffsetMetadata.relativePositionInSegment) + ) + ); + } + + @Override + public long startOffset() { + return log.logStartOffset(); + } + + @Override + public void truncateTo(long offset) { + long highWatermarkOffset = highWatermark().offset(); + if (offset < highWatermarkOffset) { + throw new IllegalArgumentException("Attempt to truncate to offset " + offset + + ", which is below the current high watermark " + highWatermarkOffset); + } + log.truncateTo(offset); + } + + @Override + public boolean truncateToLatestSnapshot() { + int latestEpoch = log.latestEpoch().orElse(0); + boolean truncated = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); + if (snapshotIdOpt.isPresent()) { + OffsetAndEpoch snapshotId = snapshotIdOpt.get(); + if (snapshotId.epoch() > latestEpoch || (snapshotId.epoch() == latestEpoch && snapshotId.offset() > endOffset().offset())) { + // Truncate the log fully if the latest snapshot is greater than the log end offset + log.truncateFullyAndStartAt(snapshotId.offset(), Optional.empty()); + + // Forget snapshots less than the log start offset + synchronized (snapshots) { + truncated = true; + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + } + } + } + removeSnapshots(forgottenSnapshots, new FullTruncation()); + return truncated; + } + + @Override + public void initializeLeaderEpoch(int epoch) { + log.assignEpochStartOffset(epoch, log.logEndOffset()); + } + + @Override + public void updateHighWatermark(LogOffsetMetadata logOffsetMetadata) { + // This API returns the new high watermark, which may be different from the passed offset + Optional<OffsetMetadata> metadata = logOffsetMetadata.metadata(); + try { + long logHighWatermark; + if (metadata.isPresent() && metadata.get() instanceof SegmentPosition segmentPosition) { + logHighWatermark = log.updateHighWatermark( + new org.apache.kafka.storage.internals.log.LogOffsetMetadata( + logOffsetMetadata.offset(), + segmentPosition.baseOffset(), + segmentPosition.relativePosition() + ) + ); + } else { + logHighWatermark = log.updateHighWatermark(logOffsetMetadata.offset()); + } + + // Temporary log message until we fix KAFKA-14825 + if (logHighWatermark != logOffsetMetadata.offset()) { + logger.warn("Log's high watermark ({}) is different from the local replica's high watermark ({})", metadata, logOffsetMetadata); + } + } catch (IOException ioe) { + throw new KafkaException(ioe); + } + } + + @Override + public LogOffsetMetadata highWatermark() { + try { + org.apache.kafka.storage.internals.log.LogOffsetMetadata hwm = log.fetchOffsetSnapshot().highWatermark; + Optional<OffsetMetadata> segmentPosition = !hwm.messageOffsetOnly() + ? Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) + : Optional.empty(); + + return new LogOffsetMetadata(hwm.messageOffset, segmentPosition); + } catch (IOException ioe) { + throw new KafkaException(ioe); + } + } + + @Override + public void flush(boolean forceFlushActiveSegment) { + log.flush(forceFlushActiveSegment); + } + + /** + * Return the topic partition associated with the log. + */ + @Override + public TopicPartition topicPartition() { + return topicPartition; + } + + /** + * Return the topic ID associated with the log. + */ + @Override + public Uuid topicId() { + return log.topicId().get(); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) { + long startOffset = startOffset(); + if (snapshotId.offset() < startOffset) { + logger.info("Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, startOffset); + return Optional.empty(); + } + + long highWatermarkOffset = highWatermark().offset(); + if (snapshotId.offset() > highWatermarkOffset) { + throw new IllegalArgumentException( + "Cannot create a snapshot with an id (" + snapshotId + ") greater than the high-watermark (" + highWatermarkOffset + ")" + ); + } + + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); + if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { + throw new IllegalArgumentException( + "Snapshot id (" + snapshotId + ") is not valid according to the log: " + validOffsetAndEpoch + ); + } + + /* + Perform a check that the requested snapshot offset is batch aligned via a log read, which + returns the base offset of the batch that contains the requested offset. A snapshot offset + is one greater than the last offset contained in the snapshot, and cannot go past the high + watermark. + + This check is necessary because Raft replication code assumes the snapshot offset is the + start of a batch. If a follower applies a non-batch aligned snapshot at offset (X) and + fetches from this offset, the returned batch will start at offset (X - M), and the + follower will be unable to append it since (X - M) < (X). + */ + long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset(); + if (snapshotId.offset() != baseOffset) { + throw new IllegalArgumentException( + "Cannot create snapshot at offset (" + snapshotId.offset() + ") because it is not batch aligned. " + + "The batch containing the requested offset has a base offset of (" + baseOffset + ")" + ); + } + return createNewSnapshotUnchecked(snapshotId); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId) { + boolean containsSnapshotId; + synchronized (snapshots) { + containsSnapshotId = snapshots.containsKey(snapshotId); + } + + if (containsSnapshotId) { + return Optional.empty(); + } else { + return Optional.of( + new NotifyingRawSnapshotWriter( + FileRawSnapshotWriter.create(log.dir().toPath(), snapshotId), + this::onSnapshotFrozen + ) + ); + } + } + + @Override + public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + Optional<FileRawSnapshotReader> reader = snapshots.get(snapshotId); + if (reader == null) { + return Optional.empty(); + } else if (reader.isPresent()) { + return Optional.of(reader.get()); + } else { + // Snapshot exists but has never been read before + try { + FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId); + snapshots.put(snapshotId, Optional.of(fileReader)); + return Optional.of(fileReader); + } catch (UncheckedIOException e) { Review Comment: The following path is only for `NoSuchFileException`. For other `IOExceptions`, we should propagate to the caller. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,816 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements RaftLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[RaftLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.internalMaxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new KafkaException(ioe); Review Comment: I noticed that existing code like Snapshots and FileSnapshotReader convert IOException to UncheckedIOException. It's probably better to follow that convention for now. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftLogTest.java: ########## @@ -0,0 +1,1190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.record.ArbitraryMemoryRecords; +import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.internals.BatchBuilder; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.test.TestUtils; + +import net.jqwik.api.AfterFailureMode; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaRaftLogTest { + + private static final MetadataLogConfig DEFAULT_METADATA_LOG_CONFIG = createMetadataLogConfig( + 100 * 1024, + 10 * 1000, + 100 * 1024, + 60 * 1000, + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + KafkaRaftClient.MAX_FETCH_SIZE_BYTES + ); + + private final MockTime mockTime = new MockTime(); + private File tempDir; + + @BeforeEach + public void setUp() { + tempDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tempDir); + } + + @Test + public void testConfig() throws IOException { + Properties props = new Properties(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093"); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(2)); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10240)); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, String.valueOf(10 * 1024)); + assertThrows(ConfigException.class, () -> { + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + }); + + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + } + + @Test + public void testUnexpectedAppendOffset() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); + int currentEpoch = 3; + long initialOffset = log.endOffset().offset(); + + log.appendAsLeader( + MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), + currentEpoch + ); + + // Throw exception for out of order records + assertThrows( + RuntimeException.class, + () -> log.appendAsLeader(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + + assertThrows( + RuntimeException.class, + () -> log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + } + + @Test + public void testEmptyAppendNotAllowed() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + assertThrows(IllegalArgumentException.class, () -> log.appendAsFollower(MemoryRecords.EMPTY, 1)); + assertThrows(IllegalArgumentException.class, () -> log.appendAsLeader(MemoryRecords.EMPTY, 1)); + } + + @ParameterizedTest + @ArgumentsSource(InvalidMemoryRecordsProvider.class) + public void testInvalidMemoryRecords(MemoryRecords records, Optional<Class<Exception>> expectedException) throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + Executable action = () -> log.appendAsFollower(records, Integer.MAX_VALUE); + if (expectedException.isPresent()) { + assertThrows(expectedException.get(), action); + } else { + assertThrows(CorruptRecordException.class, action); + } + + assertEquals(previousEndOffset, log.endOffset().offset()); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + public void testRandomRecords(@ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records) throws IOException { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + Utils.delete(tempDir); + } + } + + @Test + public void testInvalidLeaderEpoch() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + int epoch = log.lastFetchedEpoch() + 1; + int numberOfRecords = 10; + + SimpleRecord[] simpleRecords = new SimpleRecord[numberOfRecords]; + for (int i = 0; i < numberOfRecords; i++) { + simpleRecords[i] = new SimpleRecord(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + MemoryRecords batchWithValidEpoch = MemoryRecords.withRecords( + previousEndOffset, + Compression.NONE, + epoch, + simpleRecords + ); + MemoryRecords batchWithInvalidEpoch = MemoryRecords.withRecords( + previousEndOffset + numberOfRecords, + Compression.NONE, + epoch + 1, + simpleRecords + ); + + ByteBuffer buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + batchWithInvalidEpoch.sizeInBytes()); + buffer.put(batchWithValidEpoch.buffer()); + buffer.put(batchWithInvalidEpoch.buffer()); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + log.appendAsFollower(records, epoch); + + // Check that only the first batch was appended + assertEquals(previousEndOffset + numberOfRecords, log.endOffset().offset()); + // Check that the last fetched epoch matches the first batch + assertEquals(epoch, log.lastFetchedEpoch()); + } + + @Test + public void testCreateSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()); + } + + @Test + public void testCreateSnapshotFromEndOffset() throws IOException { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, firstEpoch); + append(log, numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test finding the first epoch + log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close(); + + // Test finding the second epoch + log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close(); + } + + @Test + public void testCreateSnapshotInMiddleOfBatch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotMuchLaterEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) + ); + } + + @Test + public void testHighWatermarkOffsetMetadata() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + LogOffsetMetadata highWatermarkMetadata = log.highWatermark(); + assertEquals(numberOfRecords, highWatermarkMetadata.offset()); + assertTrue(highWatermarkMetadata.metadata().isPresent()); + + SegmentPosition segmentPosition = (SegmentPosition) highWatermarkMetadata.metadata().get(); + assertEquals(0, segmentPosition.baseOffset()); + assertTrue(segmentPosition.relativePosition() > 0); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords - 4, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + for (int i = 1; i <= numberOfRecords; i++) { + append(log, 1, epoch); + } + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + // Simulate log cleanup that advances the LSO + log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion); + + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch()))); Review Comment: Should we close the created snapshot? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftLogTest.java: ########## @@ -0,0 +1,1190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.record.ArbitraryMemoryRecords; +import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.internals.BatchBuilder; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.test.TestUtils; + +import net.jqwik.api.AfterFailureMode; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaRaftLogTest { + + private static final MetadataLogConfig DEFAULT_METADATA_LOG_CONFIG = createMetadataLogConfig( + 100 * 1024, + 10 * 1000, + 100 * 1024, + 60 * 1000, + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + KafkaRaftClient.MAX_FETCH_SIZE_BYTES + ); + + private final MockTime mockTime = new MockTime(); + private File tempDir; + + @BeforeEach + public void setUp() { + tempDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tempDir); + } + + @Test + public void testConfig() throws IOException { + Properties props = new Properties(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093"); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(2)); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10240)); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, String.valueOf(10 * 1024)); + assertThrows(ConfigException.class, () -> { + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + }); + + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + } + + @Test + public void testUnexpectedAppendOffset() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); + int currentEpoch = 3; + long initialOffset = log.endOffset().offset(); + + log.appendAsLeader( + MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), + currentEpoch + ); + + // Throw exception for out of order records + assertThrows( + RuntimeException.class, + () -> log.appendAsLeader(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + + assertThrows( + RuntimeException.class, + () -> log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + } + + @Test + public void testEmptyAppendNotAllowed() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + assertThrows(IllegalArgumentException.class, () -> log.appendAsFollower(MemoryRecords.EMPTY, 1)); + assertThrows(IllegalArgumentException.class, () -> log.appendAsLeader(MemoryRecords.EMPTY, 1)); + } + + @ParameterizedTest + @ArgumentsSource(InvalidMemoryRecordsProvider.class) + public void testInvalidMemoryRecords(MemoryRecords records, Optional<Class<Exception>> expectedException) throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + Executable action = () -> log.appendAsFollower(records, Integer.MAX_VALUE); + if (expectedException.isPresent()) { + assertThrows(expectedException.get(), action); + } else { + assertThrows(CorruptRecordException.class, action); + } + + assertEquals(previousEndOffset, log.endOffset().offset()); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + public void testRandomRecords(@ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records) throws IOException { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + Utils.delete(tempDir); + } + } + + @Test + public void testInvalidLeaderEpoch() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + int epoch = log.lastFetchedEpoch() + 1; + int numberOfRecords = 10; + + SimpleRecord[] simpleRecords = new SimpleRecord[numberOfRecords]; + for (int i = 0; i < numberOfRecords; i++) { + simpleRecords[i] = new SimpleRecord(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + MemoryRecords batchWithValidEpoch = MemoryRecords.withRecords( + previousEndOffset, + Compression.NONE, + epoch, + simpleRecords + ); + MemoryRecords batchWithInvalidEpoch = MemoryRecords.withRecords( + previousEndOffset + numberOfRecords, + Compression.NONE, + epoch + 1, + simpleRecords + ); + + ByteBuffer buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + batchWithInvalidEpoch.sizeInBytes()); + buffer.put(batchWithValidEpoch.buffer()); + buffer.put(batchWithInvalidEpoch.buffer()); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + log.appendAsFollower(records, epoch); + + // Check that only the first batch was appended + assertEquals(previousEndOffset + numberOfRecords, log.endOffset().offset()); + // Check that the last fetched epoch matches the first batch + assertEquals(epoch, log.lastFetchedEpoch()); + } + + @Test + public void testCreateSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()); + } + + @Test + public void testCreateSnapshotFromEndOffset() throws IOException { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, firstEpoch); + append(log, numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test finding the first epoch + log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close(); + + // Test finding the second epoch + log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close(); + } + + @Test + public void testCreateSnapshotInMiddleOfBatch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotMuchLaterEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) + ); + } + + @Test + public void testHighWatermarkOffsetMetadata() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + LogOffsetMetadata highWatermarkMetadata = log.highWatermark(); + assertEquals(numberOfRecords, highWatermarkMetadata.offset()); + assertTrue(highWatermarkMetadata.metadata().isPresent()); + + SegmentPosition segmentPosition = (SegmentPosition) highWatermarkMetadata.metadata().get(); + assertEquals(0, segmentPosition.baseOffset()); + assertTrue(segmentPosition.relativePosition() > 0); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords - 4, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + for (int i = 1; i <= numberOfRecords; i++) { + append(log, 1, epoch); + } + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + // Simulate log cleanup that advances the LSO + log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion); + + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch()))); + } + + @Test + public void testCreateSnapshotDivergingEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotOlderEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotWithMissingEpoch() throws IOException { + int firstBatchRecords = 5; + int firstEpoch = 1; + int missingEpoch = firstEpoch + 1; + int secondBatchRecords = 5; + int secondEpoch = missingEpoch + 1; + + int numberOfRecords = firstBatchRecords + secondBatchRecords; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, firstBatchRecords, firstEpoch); + append(log, secondBatchRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(1, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(firstBatchRecords, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(secondBatchRecords, missingEpoch)) + ); + } + + @Test + public void testCreateExistingSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId), + "Creating an existing snapshot should not do anything"); + } + + @Test + public void testTopicId() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + assertEquals(Uuid.METADATA_TOPIC_ID, log.topicId()); + } + + @Test + public void testReadMissingSnapshot() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0))); + } + + @Test + public void testDeleteNonExistentSnapshot() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + int offset = 10; + int epoch = 0; + + append(log, offset, epoch); + log.updateHighWatermark(new LogOffsetMetadata(offset)); + + assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(2L, epoch))); + assertEquals(0, log.startOffset()); + assertEquals(epoch, log.lastFetchedEpoch()); + assertEquals(offset, log.endOffset().offset()); + assertEquals(offset, log.highWatermark().offset()); + } + + @Test + public void testTruncateFullyToLatestSnapshot() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + int numberOfRecords = 10; + int epoch = 0; + OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch); + + append(log, numberOfRecords, epoch); + createNewSnapshotUnchecked(log, sameEpochSnapshotId); + + assertTrue(log.truncateToLatestSnapshot()); + assertEquals(sameEpochSnapshotId.offset(), log.startOffset()); + assertEquals(sameEpochSnapshotId.epoch(), log.lastFetchedEpoch()); + assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset()); + assertEquals(sameEpochSnapshotId.offset(), log.highWatermark().offset()); + + OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1); + + append(log, numberOfRecords, epoch); + createNewSnapshotUnchecked(log, greaterEpochSnapshotId); + + assertTrue(log.truncateToLatestSnapshot()); + assertEquals(greaterEpochSnapshotId.offset(), log.startOffset()); + assertEquals(greaterEpochSnapshotId.epoch(), log.lastFetchedEpoch()); + assertEquals(greaterEpochSnapshotId.offset(), log.endOffset().offset()); + assertEquals(greaterEpochSnapshotId.offset(), log.highWatermark().offset()); + } + + @Test + public void testTruncateWillRemoveOlderSnapshot() throws IOException { + MetadataLogAndDir metadataLogAndDir = buildMetadataLogAndDir(tempDir, mockTime); + KafkaRaftLog log = metadataLogAndDir.log; + MetadataLogConfig config = metadataLogAndDir.config; + Path logDir = metadataLogAndDir.path; + + int numberOfRecords = 10; + int epoch = 1; + + append(log, 1, epoch - 1); + OffsetAndEpoch oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1); + createNewSnapshotUnchecked(log, oldSnapshotId1); + + append(log, 1, epoch); + OffsetAndEpoch oldSnapshotId2 = new OffsetAndEpoch(2, epoch); + createNewSnapshotUnchecked(log, oldSnapshotId2); + + append(log, numberOfRecords - 2, epoch); + OffsetAndEpoch oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch); + createNewSnapshotUnchecked(log, oldSnapshotId3); + + OffsetAndEpoch greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch); + createNewSnapshotUnchecked(log, greaterSnapshotId); + + assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId()); + assertTrue(log.truncateToLatestSnapshot()); + assertEquals(log.earliestSnapshotId(), log.latestSnapshotId()); + log.close(); + + mockTime.sleep(config.internalDeleteDelayMillis()); + // Assert that the log dir doesn't contain any older snapshots + Files.walk(logDir, 1) + .map(Snapshots::parse) Review Comment: This indentation is different from the one in line 628. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftLogTest.java: ########## @@ -0,0 +1,1190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.record.ArbitraryMemoryRecords; +import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.internals.BatchBuilder; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.test.TestUtils; + +import net.jqwik.api.AfterFailureMode; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaRaftLogTest { + + private static final MetadataLogConfig DEFAULT_METADATA_LOG_CONFIG = createMetadataLogConfig( + 100 * 1024, + 10 * 1000, + 100 * 1024, + 60 * 1000, + KafkaRaftClient.MAX_BATCH_SIZE_BYTES, + KafkaRaftClient.MAX_FETCH_SIZE_BYTES + ); + + private final MockTime mockTime = new MockTime(); + private File tempDir; + + @BeforeEach + public void setUp() { + tempDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tempDir); + } + + @Test + public void testConfig() throws IOException { + Properties props = new Properties(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093"); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(2)); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10240)); + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, String.valueOf(10 * 1024)); + assertThrows(ConfigException.class, () -> { + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + }); + + props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); + AbstractConfig kafkaConfig = new AbstractConfig(MetadataLogConfig.CONFIG_DEF, props); + MetadataLogConfig metadataConfig = new MetadataLogConfig(kafkaConfig); + buildMetadataLog(tempDir, mockTime, metadataConfig); + } + + @Test + public void testUnexpectedAppendOffset() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); + int currentEpoch = 3; + long initialOffset = log.endOffset().offset(); + + log.appendAsLeader( + MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), + currentEpoch + ); + + // Throw exception for out of order records + assertThrows( + RuntimeException.class, + () -> log.appendAsLeader(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + + assertThrows( + RuntimeException.class, + () -> log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), currentEpoch) + ); + } + + @Test + public void testEmptyAppendNotAllowed() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + assertThrows(IllegalArgumentException.class, () -> log.appendAsFollower(MemoryRecords.EMPTY, 1)); + assertThrows(IllegalArgumentException.class, () -> log.appendAsLeader(MemoryRecords.EMPTY, 1)); + } + + @ParameterizedTest + @ArgumentsSource(InvalidMemoryRecordsProvider.class) + public void testInvalidMemoryRecords(MemoryRecords records, Optional<Class<Exception>> expectedException) throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + Executable action = () -> log.appendAsFollower(records, Integer.MAX_VALUE); + if (expectedException.isPresent()) { + assertThrows(expectedException.get(), action); + } else { + assertThrows(CorruptRecordException.class, action); + } + + assertEquals(previousEndOffset, log.endOffset().offset()); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + public void testRandomRecords(@ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records) throws IOException { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + Utils.delete(tempDir); + } + } + + @Test + public void testInvalidLeaderEpoch() throws IOException { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + int epoch = log.lastFetchedEpoch() + 1; + int numberOfRecords = 10; + + SimpleRecord[] simpleRecords = new SimpleRecord[numberOfRecords]; + for (int i = 0; i < numberOfRecords; i++) { + simpleRecords[i] = new SimpleRecord(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + MemoryRecords batchWithValidEpoch = MemoryRecords.withRecords( + previousEndOffset, + Compression.NONE, + epoch, + simpleRecords + ); + MemoryRecords batchWithInvalidEpoch = MemoryRecords.withRecords( + previousEndOffset + numberOfRecords, + Compression.NONE, + epoch + 1, + simpleRecords + ); + + ByteBuffer buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + batchWithInvalidEpoch.sizeInBytes()); + buffer.put(batchWithValidEpoch.buffer()); + buffer.put(batchWithInvalidEpoch.buffer()); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + log.appendAsFollower(records, epoch); + + // Check that only the first batch was appended + assertEquals(previousEndOffset + numberOfRecords, log.endOffset().offset()); + // Check that the last fetched epoch matches the first batch + assertEquals(epoch, log.lastFetchedEpoch()); + } + + @Test + public void testCreateSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()); + } + + @Test + public void testCreateSnapshotFromEndOffset() throws IOException { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, firstEpoch); + append(log, numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test finding the first epoch + log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close(); + + // Test finding the second epoch + log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close(); + } + + @Test + public void testCreateSnapshotInMiddleOfBatch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) + ); + } + + @Test + public void testCreateSnapshotMuchLaterEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) + ); + } + + @Test + public void testHighWatermarkOffsetMetadata() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + LogOffsetMetadata highWatermarkMetadata = log.highWatermark(); + assertEquals(numberOfRecords, highWatermarkMetadata.offset()); + assertTrue(highWatermarkMetadata.metadata().isPresent()); + + SegmentPosition segmentPosition = (SegmentPosition) highWatermarkMetadata.metadata().get(); + assertEquals(0, segmentPosition.baseOffset()); + assertTrue(segmentPosition.relativePosition() > 0); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords - 4, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + for (int i = 1; i <= numberOfRecords; i++) { + append(log, 1, epoch); + } + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + // Simulate log cleanup that advances the LSO + log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion); + + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch()))); + } + + @Test + public void testCreateSnapshotDivergingEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotOlderEpoch() throws IOException { + int numberOfRecords = 10; + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset(), snapshotId.epoch() - 1)) + ); + } + + @Test + public void testCreateSnapshotWithMissingEpoch() throws IOException { + int firstBatchRecords = 5; + int firstEpoch = 1; + int missingEpoch = firstEpoch + 1; + int secondBatchRecords = 5; + int secondEpoch = missingEpoch + 1; + + int numberOfRecords = firstBatchRecords + secondBatchRecords; + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, firstBatchRecords, firstEpoch); + append(log, secondBatchRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(1, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(firstBatchRecords, missingEpoch)) + ); + assertThrows( + IllegalArgumentException.class, + () -> log.createNewSnapshot(new OffsetAndEpoch(secondBatchRecords, missingEpoch)) + ); + } + + @Test + public void testCreateExistingSnapshot() throws IOException { + int numberOfRecords = 10; + int epoch = 1; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + + append(log, numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + createNewSnapshot(log, snapshotId); + + assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId), Review Comment: Should we close the created snapshot? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org