cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662493849



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: 
OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {

Review comment:
       Let's just look at the file size on disk. Reading and deserializing the 
entire snapshot seems like huge overkill.
   
   There should be a `FileChannel` method that can get the file size in O(1)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to