This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ca6acf0 [SPARK-35785][SS] Cleanup support for RocksDB instance ca6acf0 is described below commit ca6acf0839baaa40a1417e7dca0cc1a22de06bb2 Author: Yuanjian Li <yuanjian...@databricks.com> AuthorDate: Fri Jul 2 00:47:55 2021 -0700 [SPARK-35785][SS] Cleanup support for RocksDB instance ### What changes were proposed in this pull request? Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager. ### Why are the changes needed? Part of the implementation of RocksDB state store. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #32933 from xuanyuanking/SPARK-35785. Authored-by: Yuanjian Li <yuanjian...@databricks.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 7 + .../streaming/state/RocksDBFileManager.scala | 106 ++++++++++ .../execution/streaming/state/RocksDBSuite.scala | 216 ++++++++++++++++++++- 3 files changed, 328 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 82aa166..f640018 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -253,6 +253,13 @@ class RocksDB( logInfo(s"Rolled back to $loadedVersion") } + def cleanup(): Unit = { + val cleanupTime = timeTakenMs { + fileManager.deleteOldVersions(conf.minVersionsToRetain) + } + logInfo(s"Cleaned old data, time taken: $cleanupTime ms") + } + /** Release all resources */ def close(): Unit = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 4731f4d..bfdace3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ +import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -198,6 +199,99 @@ class RocksDBFileManager( } } + /** + * Delete old versions by deleting the associated version and SST files. + * At a high-level, this method finds which versions to delete, and which SST files that were + * last used in those versions. It's safe to delete these SST files because a SST file can + * be reused only in successive versions. Therefore, if a SST file F was last used in version + * V, then it won't be used in version V+1 or later, and if version V can be deleted, then + * F can safely be deleted as well. + * + * To find old files, it does the following. + * - List all the existing [version].zip files + * - Find the min version that needs to be retained based on the given `numVersionsToRetain`. + * - Accordingly decide which versions should be deleted. + * - Resolve all SSTs files of all the existing versions, if not already resolved. + * - Find what was the latest version in which each SST file was used. + * - Delete the files that were last used in the to-be-deleted versions as we will not + * need those files any more. + * + * Note that it only deletes files that it knows are safe to delete. + * It may not delete the following files. + * - Partially written SST files + * - SST files that were used in a version, but that version got overwritten with a different + * set of SST files. + */ + def deleteOldVersions(numVersionsToRetain: Int): Unit = { + val path = new Path(dfsRootDir) + + // All versions present in DFS, sorted + val sortedVersions = fm.list(path, onlyZipFiles) + .map(_.getPath.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + + // Return if no versions generated yet + if (sortedVersions.isEmpty) return + + // Find the versions to delete + val maxVersionPresent = sortedVersions.last + val minVersionPresent = sortedVersions.head + val minVersionToRetain = + math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1) + val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long] + + // Return if no version to delete + if (versionsToDelete.isEmpty) return + + logInfo( + s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " + + s"cleaning up all versions older than $minVersionToRetain to retain last " + + s"$numVersionsToRetain versions") + + // Resolve RocksDB files for all the versions and find the max version each file is used + val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long] + sortedVersions.foreach { version => + val files = Option(versionToRocksDBFiles.get(version)).getOrElse { + val newResolvedFiles = getImmutableFilesFromVersionZip(version) + versionToRocksDBFiles.put(version, newResolvedFiles) + newResolvedFiles + } + files.foreach(f => fileToMaxUsedVersion(f) = version) + } + + // Best effort attempt to delete SST files that were last used in to-be-deleted versions + val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) } + logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + var failedToDelete = 0 + filesToDelete.foreach { case (file, maxUsedVersion) => + try { + val dfsFile = dfsFilePath(file.dfsFileName) + fm.delete(dfsFile) + logDebug(s"Deleted file $file that was last used in version $maxUsedVersion") + } catch { + case e: Exception => + failedToDelete += 1 + logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e) + } + } + + // Delete the version files and forget about them + versionsToDelete.foreach { version => + val versionFile = dfsBatchZipFile(version) + try { + fm.delete(versionFile) + versionToRocksDBFiles.remove(version) + logDebug(s"Deleted version $version") + } catch { + case e: Exception => + logWarning(s"Error deleting version file $versionFile for version $version", e) + } + } + logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" + + s"$failedToDelete files) not used in versions >= $minVersionToRetain") + } + /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( version: Long, @@ -295,6 +389,16 @@ class RocksDBFileManager( s"$filesReused files reused.") } + /** Get the SST files required for a version from the version zip file in DFS */ + private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { + Utils.deleteRecursively(localTempDir) + localTempDir.mkdirs() + Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir) + val metadataFile = localMetadataFile(localTempDir) + val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) + metadata.immutableFiles + } + /** * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. * Any error while writing will ensure that the file is not written. @@ -347,6 +451,8 @@ class RocksDBFileManager( private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") + override protected def logName: String = s"${super.logName} $loggingId" + private def dfsFilePath(fileName: String): Path = { if (isSstFile(fileName)) { new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index a11eb8a..4659a37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class RocksDBSuite extends SparkFunSuite { @@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite { } } + test("RocksDB: cleanup old files") { + val remoteDir = Utils.createTempDir().toString + val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10) + + def versionsPresent: Seq[Long] = { + remoteDir.listFiles.filter(_.getName.endsWith(".zip")) + .map(_.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + } + + withDB(remoteDir, conf = conf) { db => + // Generate versions without cleaning up + for (version <- 1 to 50) { + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // Clean up and verify version files and SST files were deleted + require(versionsPresent === (1L to 50L)) + val sstDir = new File(remoteDir, "SSTs") + val numSstFiles = listFiles(sstDir).length + db.cleanup() + assert(versionsPresent === (41L to 50L)) + assert(listFiles(sstDir).length < numSstFiles) + + // Verify data in retained vesions. + versionsPresent.foreach { version => + db.load(version) + val data = db.iterator().map(toStr).toSet + assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet) + } + } + } + + test("RocksDB: handle commit failures and aborts") { + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[CreateAtomicTestManager].getName) + val remoteDir = Utils.createTempDir().getAbsolutePath + val conf = RocksDBConf().copy(compactOnCommit = true) + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + // Disable failure of output stream and generate versions + CreateAtomicTestManager.shouldFailInCreateAtomic = false + for (version <- 1 to 10) { + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + + // Fail commit for next version and verify that reloading resets the files + CreateAtomicTestManager.shouldFailInCreateAtomic = true + db.put("11", "11") + intercept[IOException] { quietly { db.commit() } } + assert(db.load(10).iterator().map(toStr).toSet === version10Data) + CreateAtomicTestManager.shouldFailInCreateAtomic = false + + // Abort commit for next version and verify that reloading resets the files + db.load(10) + db.put("11", "11") + db.rollback() + assert(db.load(10).iterator().map(toStr).toSet === version10Data) + } + } + test("RocksDBFileManager: upload only new immutable files") { withTempDir { dir => val dfsRootDir = dir.getAbsolutePath @@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite { } } + test("disallow concurrent updates to the same RocksDB instance") { + quietly { + withDB( + Utils.createTempDir().toString, + conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db => + // DB has been loaded so current thread has alread acquired the lock on the RocksDB instance + + db.load(0) // Current thread should be able to load again + + // Another thread should not be able to load while current thread is using it + val ex = intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) } + } + // Assert that the error message contains the stack trace + assert(ex.getMessage.contains("Thread holding the lock has trace:")) + assert(ex.getMessage.contains("runInNewThread")) + + // Commit should release the instance allowing other threads to load new version + db.commit() + ThreadUtils.runInNewThread("concurrent-test-thread-2") { + db.load(1) + db.commit() + } + + // Another thread should not be able to load while current thread is using it + db.load(2) + intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) } + } + + // Rollback should release the instance allowing other threads to load new version + db.rollback() + ThreadUtils.runInNewThread("concurrent-test-thread-3") { + db.load(1) + db.commit() + } + } + } + } + + test("ensure concurrent access lock is released after Spark task completes") { + val conf = new SparkConf().setAppName("test").setMaster("local") + val sc = new SparkContext(conf) + + try { + RocksDBSuite.withSingletonDB { + // Load a RocksDB instance, that is, get a lock inside a task and then fail + quietly { + intercept[Exception] { + sc.makeRDD[Int](1 to 1, 1).map { i => + RocksDBSuite.singleton.load(0) + throw new Exception("fail this task to test lock release") + }.count() + } + } + + // Test whether you can load again, that is, will it successfully lock again + RocksDBSuite.singleton.load(0) + } + } finally { + sc.stop() + } + } + + test("ensure that concurrent update and cleanup consistent versions") { + quietly { + val numThreads = 20 + val numUpdatesInEachThread = 20 + val remoteDir = Utils.createTempDir().toString + @volatile var exception: Exception = null + val updatingThreads = Array.fill(numThreads) { + new Thread() { + override def run(): Unit = { + try { + for (version <- 0 to numUpdatesInEachThread) { + withDB( + remoteDir, + version = version) { db => + val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt + db.put("a", (prevValue + 1).toString) + db.commit() + } + } + } catch { + case e: Exception => + val newException = new Exception(s"ThreadId ${this.getId} failed", e) + if (exception != null) { + exception = newException + } + throw e + } + } + } + } + val cleaningThread = new Thread() { + override def run(): Unit = { + try { + withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db => + while (!this.isInterrupted) { + db.cleanup() + Thread.sleep(1) + } + } + } catch { + case e: Exception => + val newException = new Exception(s"ThreadId ${this.getId} failed", e) + if (exception != null) { + exception = newException + } + throw e + } + } + } + updatingThreads.foreach(_.start()) + cleaningThread.start() + updatingThreads.foreach(_.join()) + cleaningThread.interrupt() + cleaningThread.join() + if (exception != null) { + fail(exception) + } + withDB(remoteDir, numUpdatesInEachThread) { db => + assert(toStr(db.get("a")) === numUpdatesInEachThread.toString) + } + } + } + test("checkpoint metadata serde roundtrip") { def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { assert(metadata.json == json) @@ -304,3 +497,24 @@ class RocksDBSuite extends SparkFunSuite { def listFiles(file: String): Seq[File] = listFiles(new File(file)) } + +object RocksDBSuite { + @volatile var singleton: RocksDB = _ + + def withSingletonDB[T](func: => T): T = { + try { + singleton = new RocksDB( + dfsRootDir = Utils.createTempDir().getAbsolutePath, + conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100), + hadoopConf = new Configuration(), + loggingId = s"[Thread-${Thread.currentThread.getId}]") + + func + } finally { + if (singleton != null) { + singleton.close() + singleton = null + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org