This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6acf0  [SPARK-35785][SS] Cleanup support for RocksDB instance
ca6acf0 is described below

commit ca6acf0839baaa40a1417e7dca0cc1a22de06bb2
Author: Yuanjian Li <yuanjian...@databricks.com>
AuthorDate: Fri Jul 2 00:47:55 2021 -0700

    [SPARK-35785][SS] Cleanup support for RocksDB instance
    
    ### What changes were proposed in this pull request?
    Add the functionality of cleaning up files of old versions for the RocksDB 
instance and RocksDBFileManager.
    
    ### Why are the changes needed?
    Part of the implementation of RocksDB state store.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New UT added.
    
    Closes #32933 from xuanyuanking/SPARK-35785.
    
    Authored-by: Yuanjian Li <yuanjian...@databricks.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    |   7 +
 .../streaming/state/RocksDBFileManager.scala       | 106 ++++++++++
 .../execution/streaming/state/RocksDBSuite.scala   | 216 ++++++++++++++++++++-
 3 files changed, 328 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 82aa166..f640018 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -253,6 +253,13 @@ class RocksDB(
     logInfo(s"Rolled back to $loadedVersion")
   }
 
+  def cleanup(): Unit = {
+    val cleanupTime = timeTakenMs {
+      fileManager.deleteOldVersions(conf.minVersionsToRetain)
+    }
+    logInfo(s"Cleaned old data, time taken: $cleanupTime ms")
+  }
+
   /** Release all resources */
   def close(): Unit = {
     try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 4731f4d..bfdace3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
@@ -198,6 +199,99 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Delete old versions by deleting the associated version and SST files.
+   * At a high-level, this method finds which versions to delete, and which 
SST files that were
+   * last used in those versions. It's safe to delete these SST files because 
a SST file can
+   * be reused only in successive versions. Therefore, if a SST file F was 
last used in version
+   * V, then it won't be used in version V+1 or later, and if version V can be 
deleted, then
+   * F can safely be deleted as well.
+   *
+   * To find old files, it does the following.
+   * - List all the existing [version].zip files
+   * - Find the min version that needs to be retained based on the given 
`numVersionsToRetain`.
+   * - Accordingly decide which versions should be deleted.
+   * - Resolve all SSTs files of all the existing versions, if not already 
resolved.
+   * - Find what was the latest version in which each SST file was used.
+   * - Delete the files that were last used in the to-be-deleted versions as 
we will not
+   *   need those files any more.
+   *
+   * Note that it only deletes files that it knows are safe to delete.
+   * It may not delete the following files.
+   * - Partially written SST files
+   * - SST files that were used in a version, but that version got overwritten 
with a different
+   *   set of SST files.
+   */
+  def deleteOldVersions(numVersionsToRetain: Int): Unit = {
+    val path = new Path(dfsRootDir)
+
+    // All versions present in DFS, sorted
+    val sortedVersions = fm.list(path, onlyZipFiles)
+      .map(_.getPath.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+
+    // Return if no versions generated yet
+    if (sortedVersions.isEmpty) return
+
+    // Find the versions to delete
+    val maxVersionPresent = sortedVersions.last
+    val minVersionPresent = sortedVersions.head
+    val minVersionToRetain =
+      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
+    val versionsToDelete = sortedVersions.takeWhile(_ < 
minVersionToRetain).toSet[Long]
+
+    // Return if no version to delete
+    if (versionsToDelete.isEmpty) return
+
+    logInfo(
+      s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
+        s"cleaning up all versions older than $minVersionToRetain to retain 
last " +
+        s"$numVersionsToRetain versions")
+
+    // Resolve RocksDB files for all the versions and find the max version 
each file is used
+    val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
+    sortedVersions.foreach { version =>
+      val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
+        val newResolvedFiles = getImmutableFilesFromVersionZip(version)
+        versionToRocksDBFiles.put(version, newResolvedFiles)
+        newResolvedFiles
+      }
+      files.foreach(f => fileToMaxUsedVersion(f) = version)
+    }
+
+    // Best effort attempt to delete SST files that were last used in 
to-be-deleted versions
+    val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => 
versionsToDelete.contains(v) }
+    logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= 
$minVersionToRetain")
+    var failedToDelete = 0
+    filesToDelete.foreach { case (file, maxUsedVersion) =>
+      try {
+        val dfsFile = dfsFilePath(file.dfsFileName)
+        fm.delete(dfsFile)
+        logDebug(s"Deleted file $file that was last used in version 
$maxUsedVersion")
+      } catch {
+        case e: Exception =>
+          failedToDelete += 1
+          logWarning(s"Error deleting file $file, last used in version 
$maxUsedVersion", e)
+      }
+    }
+
+    // Delete the version files and forget about them
+    versionsToDelete.foreach { version =>
+      val versionFile = dfsBatchZipFile(version)
+      try {
+        fm.delete(versionFile)
+        versionToRocksDBFiles.remove(version)
+        logDebug(s"Deleted version $version")
+      } catch {
+        case e: Exception =>
+          logWarning(s"Error deleting version file $versionFile for version 
$version", e)
+      }
+    }
+    logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to 
delete" +
+      s"$failedToDelete files) not used in versions >= $minVersionToRetain")
+  }
+
   /** Save immutable files to DFS directory */
   private def saveImmutableFilesToDfs(
       version: Long,
@@ -295,6 +389,16 @@ class RocksDBFileManager(
       s"$filesReused files reused.")
   }
 
+  /** Get the SST files required for a version from the version zip file in 
DFS */
+  private def getImmutableFilesFromVersionZip(version: Long): 
Seq[RocksDBImmutableFile] = {
+    Utils.deleteRecursively(localTempDir)
+    localTempDir.mkdirs()
+    Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir)
+    val metadataFile = localMetadataFile(localTempDir)
+    val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
+    metadata.immutableFiles
+  }
+
   /**
    * Compress files to a single zip file in DFS. Only the file names are 
embedded in the zip.
    * Any error while writing will ensure that the file is not written.
@@ -347,6 +451,8 @@ class RocksDBFileManager(
 
   private def localMetadataFile(parentDir: File): File = new File(parentDir, 
"metadata")
 
+  override protected def logName: String = s"${super.logName} $loggingId"
+
   private def dfsFilePath(fileName: String): Path = {
     if (isSstFile(fileName)) {
       new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index a11eb8a..4659a37 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 class RocksDBSuite extends SparkFunSuite {
 
@@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  test("RocksDB: cleanup old files") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain 
= 10)
+
+    def versionsPresent: Seq[Long] = {
+      remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
+        .map(_.getName.stripSuffix(".zip"))
+        .map(_.toLong)
+        .sorted
+    }
+
+    withDB(remoteDir, conf = conf) { db =>
+      // Generate versions without cleaning up
+      for (version <- 1 to 50) {
+        db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...
+        db.commit()
+      }
+
+      // Clean up and verify version files and SST files were deleted
+      require(versionsPresent === (1L to 50L))
+      val sstDir = new File(remoteDir, "SSTs")
+      val numSstFiles = listFiles(sstDir).length
+      db.cleanup()
+      assert(versionsPresent === (41L to 50L))
+      assert(listFiles(sstDir).length < numSstFiles)
+
+      // Verify data in retained vesions.
+      versionsPresent.foreach { version =>
+        db.load(version)
+        val data = db.iterator().map(toStr).toSet
+        assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet)
+      }
+    }
+  }
+
+  test("RocksDB: handle commit failures and aborts") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+      classOf[CreateAtomicTestManager].getName)
+    val remoteDir = Utils.createTempDir().getAbsolutePath
+    val conf = RocksDBConf().copy(compactOnCommit = true)
+    withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+      // Disable failure of output stream and generate versions
+      CreateAtomicTestManager.shouldFailInCreateAtomic = false
+      for (version <- 1 to 10) {
+        db.put(version.toString, version.toString) // update "1" -> "1", "2" 
-> "2", ...
+        db.commit()
+      }
+      val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
+
+      // Fail commit for next version and verify that reloading resets the 
files
+      CreateAtomicTestManager.shouldFailInCreateAtomic = true
+      db.put("11", "11")
+      intercept[IOException] { quietly { db.commit() } }
+      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+      CreateAtomicTestManager.shouldFailInCreateAtomic = false
+
+      // Abort commit for next version and verify that reloading resets the 
files
+      db.load(10)
+      db.put("11", "11")
+      db.rollback()
+      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+    }
+  }
+
   test("RocksDBFileManager: upload only new immutable files") {
     withTempDir { dir =>
       val dfsRootDir = dir.getAbsolutePath
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {
+    quietly {
+      withDB(
+        Utils.createTempDir().toString,
+        conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+        // DB has been loaded so current thread has alread acquired the lock 
on the RocksDB instance
+
+        db.load(0)  // Current thread should be able to load again
+
+        // Another thread should not be able to load while current thread is 
using it
+        val ex = intercept[IllegalStateException] {
+          ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
+        }
+        // Assert that the error message contains the stack trace
+        assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+        assert(ex.getMessage.contains("runInNewThread"))
+
+        // Commit should release the instance allowing other threads to load 
new version
+        db.commit()
+        ThreadUtils.runInNewThread("concurrent-test-thread-2") {
+          db.load(1)
+          db.commit()
+        }
+
+        // Another thread should not be able to load while current thread is 
using it
+        db.load(2)
+        intercept[IllegalStateException] {
+          ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
+        }
+
+        // Rollback should release the instance allowing other threads to load 
new version
+        db.rollback()
+        ThreadUtils.runInNewThread("concurrent-test-thread-3") {
+          db.load(1)
+          db.commit()
+        }
+      }
+    }
+  }
+
+  test("ensure concurrent access lock is released after Spark task completes") 
{
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+    val sc = new SparkContext(conf)
+
+    try {
+      RocksDBSuite.withSingletonDB {
+        // Load a RocksDB instance, that is, get a lock inside a task and then 
fail
+        quietly {
+          intercept[Exception] {
+            sc.makeRDD[Int](1 to 1, 1).map { i =>
+              RocksDBSuite.singleton.load(0)
+              throw new Exception("fail this task to test lock release")
+            }.count()
+          }
+        }
+
+        // Test whether you can load again, that is, will it successfully lock 
again
+        RocksDBSuite.singleton.load(0)
+      }
+    } finally {
+      sc.stop()
+    }
+  }
+
+  test("ensure that concurrent update and cleanup consistent versions") {
+    quietly {
+      val numThreads = 20
+      val numUpdatesInEachThread = 20
+      val remoteDir = Utils.createTempDir().toString
+      @volatile var exception: Exception = null
+      val updatingThreads = Array.fill(numThreads) {
+        new Thread() {
+          override def run(): Unit = {
+            try {
+              for (version <- 0 to numUpdatesInEachThread) {
+                withDB(
+                  remoteDir,
+                  version = version) { db =>
+                  val prevValue = 
Option(toStr(db.get("a"))).getOrElse("0").toInt
+                  db.put("a", (prevValue + 1).toString)
+                  db.commit()
+                }
+              }
+            } catch {
+              case e: Exception =>
+                val newException = new Exception(s"ThreadId ${this.getId} 
failed", e)
+                if (exception != null) {
+                  exception = newException
+                }
+                throw e
+            }
+          }
+        }
+      }
+      val cleaningThread = new Thread() {
+        override def run(): Unit = {
+          try {
+            withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = 
true)) { db =>
+              while (!this.isInterrupted) {
+                db.cleanup()
+                Thread.sleep(1)
+              }
+            }
+          } catch {
+            case e: Exception =>
+              val newException = new Exception(s"ThreadId ${this.getId} 
failed", e)
+              if (exception != null) {
+                exception = newException
+              }
+              throw e
+          }
+        }
+      }
+      updatingThreads.foreach(_.start())
+      cleaningThread.start()
+      updatingThreads.foreach(_.join())
+      cleaningThread.interrupt()
+      cleaningThread.join()
+      if (exception != null) {
+        fail(exception)
+      }
+      withDB(remoteDir, numUpdatesInEachThread) { db =>
+        assert(toStr(db.get("a")) === numUpdatesInEachThread.toString)
+      }
+    }
+  }
+
   test("checkpoint metadata serde roundtrip") {
     def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): 
Unit = {
       assert(metadata.json == json)
@@ -304,3 +497,24 @@ class RocksDBSuite extends SparkFunSuite {
 
   def listFiles(file: String): Seq[File] = listFiles(new File(file))
 }
+
+object RocksDBSuite {
+  @volatile var singleton: RocksDB = _
+
+  def withSingletonDB[T](func: => T): T = {
+    try {
+      singleton = new RocksDB(
+        dfsRootDir = Utils.createTempDir().getAbsolutePath,
+        conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain 
= 100),
+        hadoopConf = new Configuration(),
+        loggingId = s"[Thread-${Thread.currentThread.getId}]")
+
+      func
+    } finally {
+      if (singleton != null) {
+        singleton.close()
+        singleton = null
+      }
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to