jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369878



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log 
start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+    logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+    val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+    snapshots --= expiredSnapshots.keys
+
+    expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is 
less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close 
and delete the given
+   * snapshots.

Review comment:
       Added your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to