Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589729876


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##
@@ -388,6 +388,33 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("SPARK-48105: state store unload/close happens during the maintenance") 
{
+tryWithProviderResource(
+  newStoreProvider(opId = Random.nextInt(), partition = 0, 
minDeltasForSnapshot = 1)) {
+  provider =>
+val store = 
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+val keys = (1 to 20).map(i => ("a" + i))
+keys.foreach(put(store, _, 0, 0))
+// commit state store with 20 keys.
+store.commit()
+// get the state store iterator: mimic the case which the iterator is 
hold in the
+// maintenance thread.
+val storeIterator = store.iterator()
+// If the provider is loaded in another executor, it will be unloaded 
and closed in
+// current executor.
+provider.close()
+// the store iterator should still be valid as the maintenance thread 
may have already
+// hold it and is doing snapshotting even thought the state store is 
unloaded.
+val outputKeys = new mutable.ArrayBuffer[String]
+while (storeIterator.hasNext) {

Review Comment:
   It should be, I assume? If we want to reproduce the issue more closer, 
create iterator and read a part of data, and call close() and try to read 
remaining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


anishshri-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589729603


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+synchronized { 
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }

Review Comment:
   Yea I think thats true - we could prob just clear `loadedMaps` entirely ? 
just to confirm - it won't somehow cause `clear` to be called on the values 
though right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589729057


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##
@@ -388,6 +388,33 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("SPARK-48105: state store unload/close happens during the maintenance") 
{
+tryWithProviderResource(
+  newStoreProvider(opId = Random.nextInt(), partition = 0, 
minDeltasForSnapshot = 1)) {
+  provider =>
+val store = 
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+val keys = (1 to 20).map(i => ("a" + i))
+keys.foreach(put(store, _, 0, 0))
+// commit state store with 20 keys.
+store.commit()
+// get the state store iterator: mimic the case which the iterator is 
hold in the
+// maintenance thread.
+val storeIterator = store.iterator()
+// If the provider is loaded in another executor, it will be unloaded 
and closed in
+// current executor.
+provider.close()
+// the store iterator should still be valid as the maintenance thread 
may have already
+// hold it and is doing snapshotting even thought the state store is 
unloaded.

Review Comment:
   nit: though(t) remove t



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589718949


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+synchronized { 
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }

Review Comment:
   Flipping the coin, we can also resolve this issue via explicitly copying the 
map when snapshotting. This way seems to be simpler to understand, but I see 
the proposal as more optimized so OK for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589714412


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }

Review Comment:
   Wow... this was totally unsafe... It's blindly assuming that after calling 
close() nothing will read from loadedMaps and its entities.
   
   If we were ever doing this we had to also make sure these entries must be 
removed from loadedMaps as well... (and/or explicitly invalidate this instance 
so that loadedMaps (and its entities) can never be accessed afterwards.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589714412


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }

Review Comment:
   Wow... this was totally unsafe... It's blindly assuming that after calling 
close() nothing will read from loadedMaps and its entities.
   
   If we were ever doing this we had to make sure these entries must be removed 
from loadedMaps as well... (and/or explicitly invalidate this instance so that 
loadedMaps (and its entities) can never be accessed afterwards.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589714412


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }

Review Comment:
   Wow... this was totally wrong... If we were ever doing this we had to make 
sure these entries must be removed from loadedMaps as well... (and/or 
explicitly invalidate this instance so that loadedMaps (and its entities) can 
never be accessed afterwards.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589714412


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }

Review Comment:
   Wow... this was totally wrong... If we were ever doing this we had to make 
sure these entries must be removed from loadedMaps as well...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-03 Thread via GitHub


HeartSaVioR commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1589713513


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+synchronized { 
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }

Review Comment:
   +1
   
   Btw, do we have any reason to remove entities one by one from loadedMaps? 
This is now the same as loadedMaps.clear(), unless you see some difference 
w.r.t. behavior on concurrent operation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-02 Thread via GitHub


anishshri-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1588759416


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##
@@ -388,6 +388,33 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  test("SPARK-48105: state store unload/close happens during the maintenance") 
{
+tryWithProviderResource(
+  newStoreProvider(opId = Random.nextInt(), partition = 0, 
minDeltasForSnapshot = 1)) {
+  provider =>
+val store = 
provider.getStore(0).asInstanceOf[provider.HDFSBackedStateStore]
+val keys = (1 to 20).map(i => ("a" + i))
+keys.foreach(put(store, _, 0, 0))
+// commit state store with 20 keys.
+store.commit()
+// get the state store iterator: mimic the case which the iterator is 
hold in the
+// maintenance thread.
+val storeIterator = store.iterator()
+// If the provider is loaded in another executor, it will be unloaded 
and closed in
+// current executor.
+provider.close()
+// the store iterator should still be valid as the maintenance thread 
may have already
+// hold it and is doing snapshotting even thought the state store is 
unloaded.
+val outputKeys = new mutable.ArrayBuffer[String]
+while (storeIterator.hasNext) {

Review Comment:
   Does this test fail before the change ? Predictably ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-02 Thread via GitHub


anishshri-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1588758558


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##
@@ -584,7 +584,21 @@ object StateStore extends Logging {
 }
 
 def stop(): Unit = {
-  threadPool.shutdown()
+  logInfo("Shutting down MaintenanceThreadPool")
+  threadPool.shutdown() // Disable new tasks from being submitted
+
+  // Wait a while for existing tasks to terminate
+  if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {

Review Comment:
   Why not just do `awaitTermination` once with a large timeout value - 5m 
maybe ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-02 Thread via GitHub


anishshri-db commented on code in PR #46351:
URL: https://github.com/apache/spark/pull/46351#discussion_r1588757230


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -351,7 +351,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
+synchronized { 
loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }

Review Comment:
   Should we add a comment here explaining that the underlying value needs to 
remain alive/cannot be cleared - since there could be other readers with 
shallow references ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org