jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369703



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log 
start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is 
less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close 
and delete the given
+   * snapshots.
    */
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): 
Unit = {
-    val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, 
false).iterator
-    while (expiredSnapshotIdsIter.hasNext) {
-      val snapshotId = expiredSnapshotIdsIter.next()
-      // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
-      // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
-      expiredSnapshotIdsIter.remove()
-
-      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
-      val destination = Snapshots.deleteRename(path, snapshotId)
-      try {
-        Utils.atomicMoveWithFallback(path, destination, false)
-      } catch {
-        case e: IOException =>
-          error(s"Error renaming snapshot file: $path to $destination", e)
-      }
+  private def removeSnapshots(
+    expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+  ): Unit = {
+    expiredSnapshots.foreach { case (snapshotId, _) =>
+      Snapshots.markForDelete(log.dir.toPath, snapshotId)
+    }
+
+    if (expiredSnapshots.nonEmpty) {
       scheduler.schedule(
-        "delete-snapshot-file",
-        () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
-        fileDeleteDelayMs)
+        "delete-snapshot-files",
+        KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots),

Review comment:
       Okay. Changed it to use a different syntax that should get around this 
IntelliJ error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to