This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b939acd9072 [SPARK-45002][SS] Avoid uncaught exception from state store maintenance task thread on error to prevent executor being killed b939acd9072 is described below commit b939acd9072ed3a1ba60940942df6fc72b97e5e8 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Wed Aug 30 10:22:45 2023 +0900 [SPARK-45002][SS] Avoid uncaught exception from state store maintenance task thread on error to prevent executor being killed ### What changes were proposed in this pull request? Avoid uncaught exception from state store maintenance task thread on error to prevent executor being killed ### Why are the changes needed? With the current change, the uncaught exception handler was being triggered on the executor leading to the executor being killed. ``` /** * The default uncaught exception handler for Spark daemons. It terminates the whole process for * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true. * * param exitOnUncaughtException Whether to exit the process on UncaughtException. */ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) ``` Ideally we just want to note the exception which will eventually be picked up by the maintenance task thread and force the reset, instead of causing the executor JVM process to be killed. This change fixes this issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Before: ``` [info] Run completed in 2 seconds, 398 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 [info] *** 1 TEST FAILED *** [error] Failed tests: [error] org.apache.spark.sql.execution.streaming.state.StateStoreSuite ``` After: ``` [info] Run completed in 2 seconds, 339 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #42716 from anishshri-db/task/SPARK-45002. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/StateStore.scala | 1 - .../execution/streaming/state/StateStoreSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index a1d4f7f40a7..6a3a30c7efb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -713,7 +713,6 @@ object StateStore extends Logging { case NonFatal(e) => logWarning(s"Error managing $provider, stopping management thread", e) threadPoolException.set(e) - throw e } finally { val duration = System.currentTimeMillis() - startTime val logMsg = s"Finished maintenance task for provider=$id" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 6c4e259bac5..093f4262011 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, IOException} import java.net.URI import java.util import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable @@ -48,8 +49,15 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider { + import FakeStateStoreProviderWithMaintenanceError._ private var id: StateStoreId = null + private val exceptionHandler = new Thread.UncaughtExceptionHandler() { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + errorOnMaintenance.set(true) + } + } + override def init( stateStoreId: StateStoreId, keySchema: StructType, @@ -67,10 +75,15 @@ class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider { override def getStore(version: Long): StateStore = null override def doMaintenance(): Unit = { + Thread.currentThread.setUncaughtExceptionHandler(exceptionHandler) throw new RuntimeException("Intentional maintenance failure") } } +private object FakeStateStoreProviderWithMaintenanceError { + val errorOnMaintenance = new AtomicBoolean(false) +} + @ExtendedSQLTest class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] with BeforeAndAfter { @@ -1364,6 +1377,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] quietly { withSpark(new SparkContext(conf)) { sc => withCoordinatorRef(sc) { _ => + FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.set(false) val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1), UUID.randomUUID) val storeConf = StateStoreConf(sqlConf) @@ -1373,6 +1387,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] eventually(timeout(30.seconds)) { assert(!StateStore.isMaintenanceRunning) } + + // SPARK-45002: The maintenance task thread failure should not invoke the + // SparkUncaughtExceptionHandler which could lead to the executor process + // getting killed. + assert(!FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.get) + StateStore.stop() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org