This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8c8b5366a6 KAFKA-14240; Validate kraft snapshot state on startup (#12653) 8c8b5366a6 is described below commit 8c8b5366a69a14def32bffe01faeeea448d0c9d6 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Sep 19 11:52:48 2022 -0700 KAFKA-14240; Validate kraft snapshot state on startup (#12653) We should prevent the metadata log from initializing in a known bad state. If the log start offset of the first segment is greater than 0, then must be a snapshot an offset greater than or equal to it order to ensure that the initialized state is complete. Reviewers: José Armando García Sancio <jsan...@users.noreply.github.com> --- core/src/main/scala/kafka/log/LogLoader.scala | 7 +- .../main/scala/kafka/raft/KafkaMetadataLog.scala | 43 ++++++++--- .../scala/kafka/raft/KafkaMetadataLogTest.scala | 85 +++++++++++++++++++++- .../java/org/apache/kafka/snapshot/Snapshots.java | 15 ++-- .../org/apache/kafka/snapshot/SnapshotsTest.java | 6 +- 5 files changed, 132 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index 25ee89c72b..f8da67656f 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -19,7 +19,6 @@ package kafka.log import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} - import kafka.common.LogSegmentOffsetOverflowException import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile} import kafka.server.{LogDirFailureChannel, LogOffsetMetadata} @@ -28,6 +27,7 @@ import kafka.utils.{CoreUtils, Logging, Scheduler} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.utils.Time +import org.apache.kafka.snapshot.Snapshots import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.collection.{Set, mutable} @@ -229,7 +229,10 @@ class LogLoader( if (!file.canRead) throw new IOException(s"Could not read file $file") val filename = file.getName - if (filename.endsWith(DeletedFileSuffix)) { + + // Delete stray files marked for deletion, but skip KRaft snapshots. + // These are handled in the recovery logic in `KafkaMetadataLog`. + if (filename.endsWith(DeletedFileSuffix) && !filename.endsWith(Snapshots.DELETE_SUFFIX)) { debug(s"Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index d112f3b581..95d96b3399 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -16,7 +16,7 @@ */ package kafka.raft -import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig, LogOffsetSnapshot, SnapshotGenerated} +import kafka.log.{AppendOrigin, Defaults, LogConfig, LogOffsetSnapshot, SnapshotGenerated, UnifiedLog} import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp} import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal} import kafka.utils.{CoreUtils, Logging, Scheduler} @@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} -import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots} +import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import java.io.File import java.nio.file.{Files, NoSuchFileException, Path} @@ -546,7 +546,7 @@ case class MetadataLogConfig(logSegmentBytes: Int, fileDeleteDelayMs: Int, nodeId: Int) -object KafkaMetadataLog { +object KafkaMetadataLog extends Logging { def apply( topicPartition: TopicPartition, topicId: Uuid, @@ -623,7 +623,9 @@ object KafkaMetadataLog { private def recoverSnapshots( log: UnifiedLog ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = { - val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + val snapshotsToRetain = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + val snapshotsToDelete = mutable.Buffer.empty[SnapshotPath] + // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start // from logStartOffset val filesInDir = Files.newDirectoryStream(log.dir.toPath) @@ -631,21 +633,40 @@ object KafkaMetadataLog { try { filesInDir.forEach { path => Snapshots.parse(path).ifPresent { snapshotPath => - if (snapshotPath.partial || - snapshotPath.deleted || - snapshotPath.snapshotId.offset < log.logStartOffset) { - // Delete partial snapshot, deleted snapshot and older snapshot - Files.deleteIfExists(snapshotPath.path) + // Collect partial snapshot, deleted snapshot and older snapshot for deletion + if (snapshotPath.partial + || snapshotPath.deleted + || snapshotPath.snapshotId.offset < log.logStartOffset) { + snapshotsToDelete.append(snapshotPath) } else { - snapshots.put(snapshotPath.snapshotId, None) + snapshotsToRetain.put(snapshotPath.snapshotId, None) } } } + + // Before deleting any snapshots, we should ensure that the retained snapshots are + // consistent with the current state of the log. If the log start offset is not 0, + // then we must have a snapshot which covers the initial state up to the current + // log start offset. + if (log.logStartOffset > 0) { + val latestSnapshotId = snapshotsToRetain.lastOption.map(_._1) + if (!latestSnapshotId.exists(snapshotId => snapshotId.offset >= log.logStartOffset)) { + throw new IllegalStateException("Inconsistent snapshot state: there must be a snapshot " + + s"at an offset larger then the current log start offset ${log.logStartOffset}, but the " + + s"latest snapshot is $latestSnapshotId") + } + } + + snapshotsToDelete.foreach { snapshotPath => + Files.deleteIfExists(snapshotPath.path) + info(s"Deleted unneeded snapshot file with path $snapshotPath") + } } finally { filesInDir.close() } - snapshots + info(s"Initialized snapshots with IDs ${snapshotsToRetain.keys} from ${log.dir}") + snapshotsToRetain } private def deleteSnapshotFiles( diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index a8e7b2c324..99b4f817be 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -16,7 +16,7 @@ */ package kafka.raft -import kafka.log.{Defaults, UnifiedLog, SegmentDeletion} +import kafka.log.{Defaults, SegmentDeletion, UnifiedLog} import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp} import kafka.server.{KafkaConfig, KafkaRaftServer} import kafka.utils.{MockTime, TestUtils} @@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.raft.internals.BatchBuilder import org.apache.kafka.raft._ import org.apache.kafka.server.common.serialization.RecordSerde -import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import org.apache.kafka.test.TestUtils.assertOptional import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -391,6 +391,87 @@ final class KafkaMetadataLogTest { } } + @Test + def testStartupWithInvalidSnapshotState(): Unit = { + // Initialize an empty log at offset 100. + var log = buildMetadataLog(tempDir, mockTime) + log.log.truncateFullyAndStartAt(newOffset = 100) + log.close() + + val metadataDir = metadataLogDir(tempDir) + assertTrue(metadataDir.exists()) + + // Initialization should fail unless we have a snapshot at an offset + // greater than or equal to 100. + assertThrows(classOf[IllegalStateException], () => { + buildMetadataLog(tempDir, mockTime) + }) + // Snapshots at offsets less than 100 are not sufficient. + writeEmptySnapshot(metadataDir, new OffsetAndEpoch(50, 1)) + assertThrows(classOf[IllegalStateException], () => { + buildMetadataLog(tempDir, mockTime) + }) + + // Snapshot at offset 100 should be fine. + writeEmptySnapshot(metadataDir, new OffsetAndEpoch(100, 1)) + log = buildMetadataLog(tempDir, mockTime) + log.log.truncateFullyAndStartAt(newOffset = 200) + log.close() + + // Snapshots at higher offsets are also fine. In this case, the + // log start offset should advance to the first snapshot offset. + writeEmptySnapshot(metadataDir, new OffsetAndEpoch(500, 1)) + log = buildMetadataLog(tempDir, mockTime) + assertEquals(500, log.log.logStartOffset) + } + + @Test + def testSnapshotDeletionWithInvalidSnapshotState(): Unit = { + // Initialize an empty log at offset 100. + val log = buildMetadataLog(tempDir, mockTime) + log.log.truncateFullyAndStartAt(newOffset = 100) + log.close() + + val metadataDir = metadataLogDir(tempDir) + assertTrue(metadataDir.exists()) + + // We have one deleted snapshot at an offset matching the start offset. + val snapshotId = new OffsetAndEpoch(100, 1) + writeEmptySnapshot(metadataDir, snapshotId) + + val deletedPath = Snapshots.markForDelete(metadataDir.toPath, snapshotId) + assertTrue(deletedPath.toFile.exists()) + + // Initialization should still fail. + assertThrows(classOf[IllegalStateException], () => { + buildMetadataLog(tempDir, mockTime) + }) + + // The snapshot marked for deletion should still exist. + assertTrue(deletedPath.toFile.exists()) + } + + private def metadataLogDir( + logDir: File + ): File = { + new File( + logDir.getAbsolutePath, + UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition) + ) + } + + private def writeEmptySnapshot( + metadataDir: File, + snapshotId: OffsetAndEpoch + ): Unit = { + val writer = FileRawSnapshotWriter.create( + metadataDir.toPath, + snapshotId, + Optional.empty() + ) + TestUtils.resource(writer)(_.freeze()) + } + @Test def testDoesntTruncateFully(): Unit = { val log = buildMetadataLog(tempDir, mockTime) diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index 337e56a7f8..a41f6485fa 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -32,7 +32,7 @@ public final class Snapshots { private static final Logger log = LoggerFactory.getLogger(Snapshots.class); public static final String SUFFIX = ".checkpoint"; private static final String PARTIAL_SUFFIX = String.format("%s.part", SUFFIX); - private static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX); + public static final String DELETE_SUFFIX = String.format("%s.deleted", SUFFIX); private static final NumberFormat OFFSET_FORMATTER = NumberFormat.getInstance(); private static final NumberFormat EPOCH_FORMATTER = NumberFormat.getInstance(); @@ -60,7 +60,7 @@ public final class Snapshots { return source.resolveSibling(filenameFromSnapshotId(snapshotId) + SUFFIX); } - static Path deleteRename(Path source, OffsetAndEpoch snapshotId) { + static Path deleteRenamePath(Path source, OffsetAndEpoch snapshotId) { return source.resolveSibling(filenameFromSnapshotId(snapshotId) + DELETE_SUFFIX); } @@ -114,7 +114,7 @@ public final class Snapshots { */ public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) { Path immutablePath = snapshotPath(logDir, snapshotId); - Path deletedPath = deleteRename(immutablePath, snapshotId); + Path deletedPath = deleteRenamePath(immutablePath, snapshotId); try { boolean deleted = Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath); if (deleted) { @@ -130,13 +130,16 @@ public final class Snapshots { } /** - * Mark a snapshot for deletion by renaming with the deleted suffix + * Mark a snapshot for deletion by renaming with the deleted suffix. + * + * @return the path of the snapshot marked for deletion (i.e. with .delete suffix) */ - public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) { + public static Path markForDelete(Path logDir, OffsetAndEpoch snapshotId) { Path immutablePath = snapshotPath(logDir, snapshotId); - Path deletedPath = deleteRename(immutablePath, snapshotId); + Path deletedPath = deleteRenamePath(immutablePath, snapshotId); try { Utils.atomicMoveWithFallback(immutablePath, deletedPath, false); + return deletedPath; } catch (IOException e) { throw new UncheckedIOException( String.format( diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java index ae89543e3d..57aca28220 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java @@ -75,7 +75,7 @@ final public class SnapshotsTest { TestUtils.RANDOM.nextInt(Integer.MAX_VALUE) ); Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(), snapshotId); - Path deletedPath = Snapshots.deleteRename(path, snapshotId); + Path deletedPath = Snapshots.deleteRenamePath(path, snapshotId); SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get(); assertEquals(snapshotId, snapshotPath.snapshotId); @@ -116,11 +116,11 @@ final public class SnapshotsTest { if (renameBeforeDeleting) // rename snapshot before deleting - Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false); + Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRenamePath(snapshotPath, snapshotId), false); assertTrue(Snapshots.deleteIfExists(logDirPath, snapshot.snapshotId())); assertFalse(Files.exists(snapshotPath)); - assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId))); + assertFalse(Files.exists(Snapshots.deleteRenamePath(snapshotPath, snapshotId))); } } }