xuanyuanking commented on a change in pull request #32933: URL: https://github.com/apache/spark/pull/32933#discussion_r662760604
########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala ########## @@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite { } } + test("disallow concurrent updates to the same RocksDB instance") { + quietly { + withDB( + Utils.createTempDir().toString, + conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db => + // DB has been loaded so current thread has alread acquired the lock on the RocksDB instance + + db.load(0) // Current thread should be able to load again + + // Another thread should not be able to load while current thread is using it + val ex = intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) } + } + // Assert that the error message contains the stack trace + assert(ex.getMessage.contains("Thread holding the lock has trace:")) + assert(ex.getMessage.contains("runInNewThread")) + + // Commit should release the instance allowing other threads to load new version + db.commit() + ThreadUtils.runInNewThread("concurrent-test-thread-2") { + db.load(1) + db.commit() + } + + // Another thread should not be able to load while current thread is using it + db.load(2) + intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) } + } + + // Rollback should release the instance allowing other threads to load new version + db.rollback() + ThreadUtils.runInNewThread("concurrent-test-thread-3") { + db.load(1) + db.commit() + } + } + } + } + + test("ensure concurrent access lock is released after Spark task completes") { + val conf = new SparkConf().setAppName("test").setMaster("local") + val sc = new SparkContext(conf) + + try { + RocksDBSuite.withSingletonDB { + // Load a RocksDB instance, that is, get a lock inside a task and then fail + quietly { + intercept[Exception] { + sc.makeRDD[Int](1 to 1, 1).map { i => + RocksDBSuite.singleton.load(0) + throw new Exception("fail this task to test lock release") + }.count() + } + } + + // Test whether you can load again, that is, will it successfully lock again + RocksDBSuite.singleton.load(0) + } + } finally { + sc.stop() + } + } + + test("ensure that concurrent update and cleanup consistent versions") { + quietly { + val numThreads = 20 + val numUpdatesInEachThread = 20 + val remoteDir = Utils.createTempDir().toString + @volatile var exception: Exception = null + val updatingThreads = Array.fill(numThreads) { + new Thread() { + override def run(): Unit = { + try { + for (version <- 0 to numUpdatesInEachThread) { + withDB( Review comment: This is to simulate the multi-thread scenario of updating and cleaning old versions. It will not conflict since we call commit for each update thread and the version get updated for each commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org