This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 0c08c80afa KAFKA-14240; Validate kraft snapshot state on startup 
(#12653)
0c08c80afa is described below

commit 0c08c80afa8a00aa520ec407d9bce6537395fc23
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Sep 19 11:52:48 2022 -0700

    KAFKA-14240; Validate kraft snapshot state on startup (#12653)
    
    We should prevent the metadata log from initializing in a known bad state. 
If the log start offset of the first segment is greater than 0, then must be a 
snapshot an offset greater than or equal to it order to ensure that the 
initialized state is complete.
    
    Reviewers: José Armando García Sancio <jsan...@users.noreply.github.com>
---
 core/src/main/scala/kafka/log/LogLoader.scala      |  7 +-
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   | 43 ++++++++---
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 85 +++++++++++++++++++++-
 .../java/org/apache/kafka/snapshot/Snapshots.java  | 15 ++--
 .../org/apache/kafka/snapshot/SnapshotsTest.java   |  6 +-
 5 files changed, 132 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogLoader.scala 
b/core/src/main/scala/kafka/log/LogLoader.scala
index 25ee89c72b..f8da67656f 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
-
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, 
SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
 import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
@@ -28,6 +27,7 @@ import kafka.utils.{CoreUtils, Logging, Scheduler}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidOffsetException
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.snapshot.Snapshots
 
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import scala.collection.{Set, mutable}
@@ -229,7 +229,10 @@ class LogLoader(
       if (!file.canRead)
         throw new IOException(s"Could not read file $file")
       val filename = file.getName
-      if (filename.endsWith(DeletedFileSuffix)) {
+
+      // Delete stray files marked for deletion, but skip KRaft snapshots.
+      // These are handled in the recovery logic in `KafkaMetadataLog`.
+      if (filename.endsWith(DeletedFileSuffix) && 
!filename.endsWith(Snapshots.DELETE_SUFFIX)) {
         debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
         Files.deleteIfExists(file.toPath)
       } else if (filename.endsWith(CleanedFileSuffix)) {
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index d112f3b581..95d96b3399 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,7 +16,7 @@
  */
 package kafka.raft
 
-import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig, 
LogOffsetSnapshot, SnapshotGenerated}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated, UnifiedLog}
 import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, 
MetadataLogSegmentMinBytesProp}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
@@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, 
MemoryRecords, Record
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, 
LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
-import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots}
+import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
 import java.io.File
 import java.nio.file.{Files, NoSuchFileException, Path}
@@ -546,7 +546,7 @@ case class MetadataLogConfig(logSegmentBytes: Int,
                              fileDeleteDelayMs: Int,
                              nodeId: Int)
 
-object KafkaMetadataLog {
+object KafkaMetadataLog extends Logging {
   def apply(
     topicPartition: TopicPartition,
     topicId: Uuid,
@@ -623,7 +623,9 @@ object KafkaMetadataLog {
   private def recoverSnapshots(
     log: UnifiedLog
   ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
-    val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+    val snapshotsToRetain = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+    val snapshotsToDelete = mutable.Buffer.empty[SnapshotPath]
+
     // Scan the log directory; deleting partial snapshots and older snapshot, 
only remembering immutable snapshots start
     // from logStartOffset
     val filesInDir = Files.newDirectoryStream(log.dir.toPath)
@@ -631,21 +633,40 @@ object KafkaMetadataLog {
     try {
       filesInDir.forEach { path =>
         Snapshots.parse(path).ifPresent { snapshotPath =>
-          if (snapshotPath.partial ||
-            snapshotPath.deleted ||
-            snapshotPath.snapshotId.offset < log.logStartOffset) {
-            // Delete partial snapshot, deleted snapshot and older snapshot
-            Files.deleteIfExists(snapshotPath.path)
+          // Collect partial snapshot, deleted snapshot and older snapshot for 
deletion
+          if (snapshotPath.partial
+            || snapshotPath.deleted
+            || snapshotPath.snapshotId.offset < log.logStartOffset) {
+            snapshotsToDelete.append(snapshotPath)
           } else {
-            snapshots.put(snapshotPath.snapshotId, None)
+            snapshotsToRetain.put(snapshotPath.snapshotId, None)
           }
         }
       }
+
+      // Before deleting any snapshots, we should ensure that the retained 
snapshots are
+      // consistent with the current state of the log. If the log start offset 
is not 0,
+      // then we must have a snapshot which covers the initial state up to the 
current
+      // log start offset.
+      if (log.logStartOffset > 0) {
+        val latestSnapshotId = snapshotsToRetain.lastOption.map(_._1)
+        if (!latestSnapshotId.exists(snapshotId => snapshotId.offset >= 
log.logStartOffset)) {
+          throw new IllegalStateException("Inconsistent snapshot state: there 
must be a snapshot " +
+            s"at an offset larger then the current log start offset 
${log.logStartOffset}, but the " +
+            s"latest snapshot is $latestSnapshotId")
+        }
+      }
+
+      snapshotsToDelete.foreach { snapshotPath =>
+        Files.deleteIfExists(snapshotPath.path)
+        info(s"Deleted unneeded snapshot file with path $snapshotPath")
+      }
     } finally {
       filesInDir.close()
     }
 
-    snapshots
+    info(s"Initialized snapshots with IDs ${snapshotsToRetain.keys} from 
${log.dir}")
+    snapshotsToRetain
   }
 
   private def deleteSnapshotFiles(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 49b1206606..fa9885e358 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.raft
 
-import kafka.log.{Defaults, UnifiedLog, SegmentDeletion}
+import kafka.log.{Defaults, SegmentDeletion, UnifiedLog}
 import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, 
MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, 
ProcessRolesProp, QuorumVotersProp}
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.{MockTime, TestUtils}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
 import org.apache.kafka.raft._
 import org.apache.kafka.server.common.serialization.RecordSerde
-import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter, 
SnapshotPath, Snapshots}
+import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, 
RawSnapshotWriter, SnapshotPath, Snapshots}
 import org.apache.kafka.test.TestUtils.assertOptional
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -391,6 +391,87 @@ final class KafkaMetadataLogTest {
       }
   }
 
+  @Test
+  def testStartupWithInvalidSnapshotState(): Unit = {
+    // Initialize an empty log at offset 100.
+    var log = buildMetadataLog(tempDir, mockTime)
+    log.log.truncateFullyAndStartAt(newOffset = 100)
+    log.close()
+
+    val metadataDir = metadataLogDir(tempDir)
+    assertTrue(metadataDir.exists())
+
+    // Initialization should fail unless we have a snapshot  at an offset
+    // greater than or equal to 100.
+    assertThrows(classOf[IllegalStateException], () => {
+      buildMetadataLog(tempDir, mockTime)
+    })
+    // Snapshots at offsets less than 100 are not sufficient.
+    writeEmptySnapshot(metadataDir, new OffsetAndEpoch(50, 1))
+    assertThrows(classOf[IllegalStateException], () => {
+      buildMetadataLog(tempDir, mockTime)
+    })
+
+    // Snapshot at offset 100 should be fine.
+    writeEmptySnapshot(metadataDir, new OffsetAndEpoch(100, 1))
+    log = buildMetadataLog(tempDir, mockTime)
+    log.log.truncateFullyAndStartAt(newOffset = 200)
+    log.close()
+
+    // Snapshots at higher offsets are also fine. In this case, the
+    // log start offset should advance to the first snapshot offset.
+    writeEmptySnapshot(metadataDir, new OffsetAndEpoch(500, 1))
+    log = buildMetadataLog(tempDir, mockTime)
+    assertEquals(500, log.log.logStartOffset)
+  }
+
+  @Test
+  def testSnapshotDeletionWithInvalidSnapshotState(): Unit = {
+    // Initialize an empty log at offset 100.
+    val log = buildMetadataLog(tempDir, mockTime)
+    log.log.truncateFullyAndStartAt(newOffset = 100)
+    log.close()
+
+    val metadataDir = metadataLogDir(tempDir)
+    assertTrue(metadataDir.exists())
+
+    // We have one deleted snapshot at an offset matching the start offset.
+    val snapshotId = new OffsetAndEpoch(100, 1)
+    writeEmptySnapshot(metadataDir, snapshotId)
+
+    val deletedPath = Snapshots.markForDelete(metadataDir.toPath, snapshotId)
+    assertTrue(deletedPath.toFile.exists())
+
+    // Initialization should still fail.
+    assertThrows(classOf[IllegalStateException], () => {
+      buildMetadataLog(tempDir, mockTime)
+    })
+
+    // The snapshot marked for deletion should still exist.
+    assertTrue(deletedPath.toFile.exists())
+  }
+
+  private def metadataLogDir(
+    logDir: File
+  ): File = {
+    new File(
+      logDir.getAbsolutePath,
+      UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)
+    )
+  }
+
+  private def writeEmptySnapshot(
+    metadataDir: File,
+    snapshotId: OffsetAndEpoch
+  ): Unit = {
+    val writer = FileRawSnapshotWriter.create(
+      metadataDir.toPath,
+      snapshotId,
+      Optional.empty()
+    )
+    TestUtils.resource(writer)(_.freeze())
+  }
+
   @Test
   def testDoesntTruncateFully(): Unit = {
     val log = buildMetadataLog(tempDir, mockTime)
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java 
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index 337e56a7f8..a41f6485fa 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -32,7 +32,7 @@ public final class Snapshots {
     private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
     public static final String SUFFIX = ".checkpoint";
     private static final String PARTIAL_SUFFIX = String.format("%s.part", 
SUFFIX);
-    private static final String DELETE_SUFFIX = String.format("%s.deleted", 
SUFFIX);
+    public static final String DELETE_SUFFIX = String.format("%s.deleted", 
SUFFIX);
 
     private static final NumberFormat OFFSET_FORMATTER = 
NumberFormat.getInstance();
     private static final NumberFormat EPOCH_FORMATTER = 
NumberFormat.getInstance();
@@ -60,7 +60,7 @@ public final class Snapshots {
         return source.resolveSibling(filenameFromSnapshotId(snapshotId) + 
SUFFIX);
     }
 
-    static Path deleteRename(Path source, OffsetAndEpoch snapshotId) {
+    static Path deleteRenamePath(Path source, OffsetAndEpoch snapshotId) {
         return source.resolveSibling(filenameFromSnapshotId(snapshotId) + 
DELETE_SUFFIX);
     }
 
@@ -114,7 +114,7 @@ public final class Snapshots {
      */
     public static boolean deleteIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
         Path immutablePath = snapshotPath(logDir, snapshotId);
-        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
         try {
             boolean deleted = Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
             if (deleted) {
@@ -130,13 +130,16 @@ public final class Snapshots {
     }
 
     /**
-     * Mark a snapshot for deletion by renaming with the deleted suffix
+     * Mark a snapshot for deletion by renaming with the deleted suffix.
+     *
+     * @return the path of the snapshot marked for deletion (i.e. with .delete 
suffix)
      */
-    public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+    public static Path markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
         Path immutablePath = snapshotPath(logDir, snapshotId);
-        Path deletedPath = deleteRename(immutablePath, snapshotId);
+        Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
         try {
             Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+            return deletedPath;
         } catch (IOException e) {
             throw new UncheckedIOException(
                 String.format(
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java 
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
index ae89543e3d..57aca28220 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
@@ -75,7 +75,7 @@ final public class SnapshotsTest {
             TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
         );
         Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(), 
snapshotId);
-        Path deletedPath = Snapshots.deleteRename(path, snapshotId);
+        Path deletedPath = Snapshots.deleteRenamePath(path, snapshotId);
         SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get();
 
         assertEquals(snapshotId, snapshotPath.snapshotId);
@@ -116,11 +116,11 @@ final public class SnapshotsTest {
 
             if (renameBeforeDeleting)
                 // rename snapshot before deleting
-                Utils.atomicMoveWithFallback(snapshotPath, 
Snapshots.deleteRename(snapshotPath, snapshotId), false);
+                Utils.atomicMoveWithFallback(snapshotPath, 
Snapshots.deleteRenamePath(snapshotPath, snapshotId), false);
 
             assertTrue(Snapshots.deleteIfExists(logDirPath, 
snapshot.snapshotId()));
             assertFalse(Files.exists(snapshotPath));
-            assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, 
snapshotId)));
+            assertFalse(Files.exists(Snapshots.deleteRenamePath(snapshotPath, 
snapshotId)));
         }
     }
 }

Reply via email to