[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-02 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662807147



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
 }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {
+quietly {
+  withDB(
+Utils.createTempDir().toString,
+conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+// DB has been loaded so current thread has alread acquired the lock 
on the RocksDB instance
+
+db.load(0)  // Current thread should be able to load again
+
+// Another thread should not be able to load while current thread is 
using it
+val ex = intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
+}
+// Assert that the error message contains the stack trace
+assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+assert(ex.getMessage.contains("runInNewThread"))
+
+// Commit should release the instance allowing other threads to load 
new version
+db.commit()
+ThreadUtils.runInNewThread("concurrent-test-thread-2") {
+  db.load(1)
+  db.commit()
+}
+
+// Another thread should not be able to load while current thread is 
using it
+db.load(2)
+intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
+}
+
+// Rollback should release the instance allowing other threads to load 
new version
+db.rollback()
+ThreadUtils.runInNewThread("concurrent-test-thread-3") {
+  db.load(1)
+  db.commit()
+}
+  }
+}
+  }
+
+  test("ensure concurrent access lock is released after Spark task completes") 
{
+val conf = new SparkConf().setAppName("test").setMaster("local")
+val sc = new SparkContext(conf)
+
+try {
+  RocksDBSuite.withSingletonDB {
+// Load a RocksDB instance, that is, get a lock inside a task and then 
fail
+quietly {
+  intercept[Exception] {
+sc.makeRDD[Int](1 to 1, 1).map { i =>
+  RocksDBSuite.singleton.load(0)
+  throw new Exception("fail this task to test lock release")
+}.count()
+  }
+}
+
+// Test whether you can load again, that is, will it successfully lock 
again
+RocksDBSuite.singleton.load(0)
+  }
+} finally {
+  sc.stop()
+}
+  }
+
+  test("ensure that concurrent update and cleanup consistent versions") {
+quietly {
+  val numThreads = 20
+  val numUpdatesInEachThread = 20
+  val remoteDir = Utils.createTempDir().toString
+  @volatile var exception: Exception = null
+  val updatingThreads = Array.fill(numThreads) {
+new Thread() {
+  override def run(): Unit = {
+try {
+  for (version <- 0 to numUpdatesInEachThread) {
+withDB(

Review comment:
   @xuanyuanking and me discussed this test offline. Seems there is 
something wrong with `exception` usage. It doesn't look completely correct. 
@xuanyuanking will address it by fixing it or deleting the test later in a 
follow-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-02 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662762934



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
 }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {
+quietly {
+  withDB(
+Utils.createTempDir().toString,
+conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+// DB has been loaded so current thread has alread acquired the lock 
on the RocksDB instance
+
+db.load(0)  // Current thread should be able to load again
+
+// Another thread should not be able to load while current thread is 
using it
+val ex = intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
+}
+// Assert that the error message contains the stack trace
+assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+assert(ex.getMessage.contains("runInNewThread"))
+
+// Commit should release the instance allowing other threads to load 
new version
+db.commit()
+ThreadUtils.runInNewThread("concurrent-test-thread-2") {
+  db.load(1)
+  db.commit()
+}
+
+// Another thread should not be able to load while current thread is 
using it
+db.load(2)
+intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
+}
+
+// Rollback should release the instance allowing other threads to load 
new version
+db.rollback()
+ThreadUtils.runInNewThread("concurrent-test-thread-3") {
+  db.load(1)
+  db.commit()
+}
+  }
+}
+  }
+
+  test("ensure concurrent access lock is released after Spark task completes") 
{
+val conf = new SparkConf().setAppName("test").setMaster("local")
+val sc = new SparkContext(conf)
+
+try {
+  RocksDBSuite.withSingletonDB {
+// Load a RocksDB instance, that is, get a lock inside a task and then 
fail
+quietly {
+  intercept[Exception] {
+sc.makeRDD[Int](1 to 1, 1).map { i =>
+  RocksDBSuite.singleton.load(0)
+  throw new Exception("fail this task to test lock release")
+}.count()
+  }
+}
+
+// Test whether you can load again, that is, will it successfully lock 
again
+RocksDBSuite.singleton.load(0)
+  }
+} finally {
+  sc.stop()
+}
+  }
+
+  test("ensure that concurrent update and cleanup consistent versions") {
+quietly {
+  val numThreads = 20
+  val numUpdatesInEachThread = 20
+  val remoteDir = Utils.createTempDir().toString
+  @volatile var exception: Exception = null
+  val updatingThreads = Array.fill(numThreads) {
+new Thread() {
+  override def run(): Unit = {
+try {
+  for (version <- 0 to numUpdatesInEachThread) {
+withDB(

Review comment:
   Hmm, will it happens? I think `RocksDB` is not thread-safe, and each 
state task only has one `RocksDB` instance. They should update and clean old 
versions individually as they are for different state store. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-02 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662757641



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
 }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {

Review comment:
   okay




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-01 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662754308



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##
@@ -253,6 +253,13 @@ class RocksDB(
 logInfo(s"Rolled back to $loadedVersion")
   }
 
+  def cleanup(): Unit = {

Review comment:
   okay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-01 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662567672



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
 }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {
+quietly {
+  withDB(
+Utils.createTempDir().toString,
+conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+// DB has been loaded so current thread has alread acquired the lock 
on the RocksDB instance
+
+db.load(0)  // Current thread should be able to load again
+
+// Another thread should not be able to load while current thread is 
using it
+val ex = intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
+}
+// Assert that the error message contains the stack trace
+assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+assert(ex.getMessage.contains("runInNewThread"))
+
+// Commit should release the instance allowing other threads to load 
new version
+db.commit()
+ThreadUtils.runInNewThread("concurrent-test-thread-2") {
+  db.load(1)
+  db.commit()
+}
+
+// Another thread should not be able to load while current thread is 
using it
+db.load(2)
+intercept[IllegalStateException] {
+  ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
+}
+
+// Rollback should release the instance allowing other threads to load 
new version
+db.rollback()
+ThreadUtils.runInNewThread("concurrent-test-thread-3") {
+  db.load(1)
+  db.commit()
+}
+  }
+}
+  }
+
+  test("ensure concurrent access lock is released after Spark task completes") 
{
+val conf = new SparkConf().setAppName("test").setMaster("local")
+val sc = new SparkContext(conf)
+
+try {
+  RocksDBSuite.withSingletonDB {
+// Load a RocksDB instance, that is, get a lock inside a task and then 
fail
+quietly {
+  intercept[Exception] {
+sc.makeRDD[Int](1 to 1, 1).map { i =>
+  RocksDBSuite.singleton.load(0)
+  throw new Exception("fail this task to test lock release")
+}.count()
+  }
+}
+
+// Test whether you can load again, that is, will it successfully lock 
again
+RocksDBSuite.singleton.load(0)
+  }
+} finally {
+  sc.stop()
+}
+  }
+
+  test("ensure that concurrent update and cleanup consistent versions") {
+quietly {
+  val numThreads = 20
+  val numUpdatesInEachThread = 20
+  val remoteDir = Utils.createTempDir().toString
+  @volatile var exception: Exception = null
+  val updatingThreads = Array.fill(numThreads) {
+new Thread() {
+  override def run(): Unit = {
+try {
+  for (version <- 0 to numUpdatesInEachThread) {
+withDB(

Review comment:
   Hm, what this test is used for? Each `RocksDB` in each thread uses the 
same remote root dir, won't they conflict?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-01 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662563375



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##
@@ -253,6 +253,13 @@ class RocksDB(
 logInfo(s"Rolled back to $loadedVersion")
   }
 
+  def cleanup(): Unit = {

Review comment:
   When will we call this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32933: [SPARK-35785][SS] Cleanup support for RocksDB instance

2021-07-01 Thread GitBox


viirya commented on a change in pull request #32933:
URL: https://github.com/apache/spark/pull/32933#discussion_r662564040



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
 }
   }
 
+  test("disallow concurrent updates to the same RocksDB instance") {

Review comment:
   This test seems not related to clean up change here? Looks like more 
related to RocksDB instance PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org