This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b939acd9072 [SPARK-45002][SS] Avoid uncaught exception from state 
store maintenance task thread on error to prevent executor being killed
b939acd9072 is described below

commit b939acd9072ed3a1ba60940942df6fc72b97e5e8
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Wed Aug 30 10:22:45 2023 +0900

    [SPARK-45002][SS] Avoid uncaught exception from state store maintenance 
task thread on error to prevent executor being killed
    
    ### What changes were proposed in this pull request?
    Avoid uncaught exception from state store maintenance task thread on error 
to prevent executor being killed
    
    ### Why are the changes needed?
    With the current change, the uncaught exception handler was being triggered 
on the executor leading to the executor being killed.
    
    ```
    /**
     * The default uncaught exception handler for Spark daemons. It terminates 
the whole process for
     * any Errors, and also terminates the process for Exceptions when the 
exitOnException flag is true.
     *
     * param exitOnUncaughtException Whether to exit the process on 
UncaughtException.
     */
    private[spark] class SparkUncaughtExceptionHandler(val 
exitOnUncaughtException: Boolean = true)
    ```
    
    Ideally we just want to note the exception which will eventually be picked 
up by the maintenance task thread and force the reset, instead of causing the 
executor JVM process to be killed. This change fixes this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    Before:
    ```
    [info] Run completed in 2 seconds, 398 milliseconds.
    [info] Total number of tests run: 1
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
    [info] *** 1 TEST FAILED ***
    [error] Failed tests:
    [error]         
org.apache.spark.sql.execution.streaming.state.StateStoreSuite
    ```
    
    After:
    ```
    [info] Run completed in 2 seconds, 339 milliseconds.
    [info] Total number of tests run: 1
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42716 from anishshri-db/task/SPARK-45002.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala   |  1 -
 .../execution/streaming/state/StateStoreSuite.scala  | 20 ++++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index a1d4f7f40a7..6a3a30c7efb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -713,7 +713,6 @@ object StateStore extends Logging {
             case NonFatal(e) =>
               logWarning(s"Error managing $provider, stopping management 
thread", e)
               threadPoolException.set(e)
-              throw e
           } finally {
             val duration = System.currentTimeMillis() - startTime
             val logMsg = s"Finished maintenance task for provider=$id" +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6c4e259bac5..093f4262011 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
 import java.net.URI
 import java.util
 import java.util.UUID
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -48,8 +49,15 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider {
+  import FakeStateStoreProviderWithMaintenanceError._
   private var id: StateStoreId = null
 
+  private val exceptionHandler = new Thread.UncaughtExceptionHandler() {
+    override def uncaughtException(t: Thread, e: Throwable): Unit = {
+      errorOnMaintenance.set(true)
+    }
+  }
+
   override def init(
       stateStoreId: StateStoreId,
       keySchema: StructType,
@@ -67,10 +75,15 @@ class FakeStateStoreProviderWithMaintenanceError extends 
StateStoreProvider {
   override def getStore(version: Long): StateStore = null
 
   override def doMaintenance(): Unit = {
+    Thread.currentThread.setUncaughtExceptionHandler(exceptionHandler)
     throw new RuntimeException("Intentional maintenance failure")
   }
 }
 
+private object FakeStateStoreProviderWithMaintenanceError {
+  val errorOnMaintenance = new AtomicBoolean(false)
+}
+
 @ExtendedSQLTest
 class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
   with BeforeAndAfter {
@@ -1364,6 +1377,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     quietly {
       withSpark(new SparkContext(conf)) { sc =>
         withCoordinatorRef(sc) { _ =>
+          
FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.set(false)
           val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1), 
UUID.randomUUID)
           val storeConf = StateStoreConf(sqlConf)
 
@@ -1373,6 +1387,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
           eventually(timeout(30.seconds)) {
             assert(!StateStore.isMaintenanceRunning)
           }
+
+          // SPARK-45002: The maintenance task thread failure should not 
invoke the
+          // SparkUncaughtExceptionHandler which could lead to the executor 
process
+          // getting killed.
+          
assert(!FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.get)
+
           StateStore.stop()
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to