xuanyuanking commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r661306386



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -199,6 +199,96 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Delete old versions by deleting the associated version and SST files.
+   * At a high-level, this method finds which versions to delete, and which 
SST files that were
+   * last used in those versions. Its safe to delete these SST files because a 
SST file can
+   * be reused only in successive versions. Therefore, if a SST file F was 
last used in version
+   * V, then it wont be used in version V+1 or later, and if version V can be 
deleted, then
+   * F can safely be deleted as well.
+   *
+   * To find old files, it does the following.
+   * - List all the existing [version].zip files
+   * - Find the min version that needs to be retained based on the given 
`numVersionsToRetain`.
+   * - Accordingly decide which versions should be deleted.
+   * - Resolve all SSTs files of all the existing versions, if not already 
resolved.
+   * - Find what was the latest version in which each SST file was used.
+   * - Delete the files that were last used in the to-be-deleted versions as 
we will not
+   *   need those files any more.
+   *
+   * Note that it only deletes files that it knows are safe to delete.
+   * It may not delete the following files.
+   * - Partially written SST files
+   * - SST files that were used in a version, but that version got overwritten 
with a different
+   *   set of SST files.
+   */
+  def deleteOldVersions(numVersionsToRetain: Int): Unit = {
+    val path = new Path(dfsRootDir)
+
+    // All versions present in DFS, sorted
+    val sortedVersions = fm.list(path, onlyZipFiles)
+      .map(_.getPath.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+
+    // Return if no versions generated yet
+    if (sortedVersions.isEmpty) return
+
+    // Find the versions to delete
+    val maxVersionPresent = sortedVersions.last
+    val minVersionPresent = sortedVersions.head
+    val minVersionToRetain =
+      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
+    val versionsToDelete = sortedVersions.takeWhile(_ < 
minVersionToRetain).toSet[Long]
+
+    // Return if no version to delete
+    if (versionsToDelete.isEmpty) return
+
+    logInfo(
+      s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
+        s"cleaning up all versions older than $minVersionToRetain to retain 
last " +
+        s"$numVersionsToRetain versions")
+
+    // Resolve RocksDB files for all the versions and find the max version 
each file is used
+    val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
+    sortedVersions.foreach { version =>
+      val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
+        val newResolvedFiles = getImmutableFilesFromVersionZip(version)
+        versionToRocksDBFiles.put(version, newResolvedFiles)
+        newResolvedFiles
+      }
+      files.foreach(f => fileToMaxUsedVersion(f) = version)
+    }
+
+    // Best effort attempt to delete SST files that were last used in 
to-be-deleted versions
+    val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => 
versionsToDelete.contains(v)}

Review comment:
       Thanks, done in the next commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to