Repository: spark Updated Branches: refs/heads/master 24367f23f -> e16e8c7ad
[SPARK-21146][CORE] Master/Worker should handle and shutdown when any thread gets UncaughtException ## What changes were proposed in this pull request? Adding the default UncaughtExceptionHandler to the Worker. ## How was this patch tested? I verified it manually, when any of the worker thread gets uncaught exceptions then the default UncaughtExceptionHandler will handle those exceptions. Author: Devaraj K <deva...@apache.org> Closes #18357 from devaraj-kavali/SPARK-21146. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e16e8c7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e16e8c7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e16e8c7a Branch: refs/heads/master Commit: e16e8c7ad31762aaca5e2bc874de1540af9cc4b7 Parents: 24367f2 Author: Devaraj K <deva...@apache.org> Authored: Wed Jul 12 00:14:58 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jul 12 00:14:58 2017 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/deploy/master/Master.scala | 4 +++- .../scala/org/apache/spark/deploy/worker/Worker.scala | 4 +++- .../main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../spark/util/SparkUncaughtExceptionHandler.scala | 11 ++++++----- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 +++- .../spark/deploy/mesos/MesosClusterDispatcher.scala | 2 +- 6 files changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0dee25f..4cc580e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, Serializer} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Master( override val rpcEnv: RpcEnv, @@ -1045,6 +1045,8 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( + exitOnUncaughtException = false)) Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bed4745..f6d3876 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( + exitOnUncaughtException = false)) Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 19e7eb0..21f0db1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -56,7 +56,7 @@ private[spark] class Executor( env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false, - uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler) + uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler) extends Logging { logInfo(s"Starting executor ID $executorId on host $executorHostname") http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 95bf3f5..e0f5af5 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -20,11 +20,12 @@ package org.apache.spark.util import org.apache.spark.internal.Logging /** - * The default uncaught exception handler for Executors terminates the whole process, to avoid - * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better - * to fail fast when things go wrong. + * The default uncaught exception handler for Spark daemons. It terminates the whole process for + * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true. + * + * @param exitOnUncaughtException Whether to exit the process on UncaughtException. */ -private[spark] object SparkUncaughtExceptionHandler +private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) extends Thread.UncaughtExceptionHandler with Logging { override def uncaughtException(thread: Thread, exception: Throwable) { @@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler if (!ShutdownHookManager.inShutdown()) { if (exception.isInstanceOf[OutOfMemoryError]) { System.exit(SparkExitCode.OOM) - } else { + } else if (exitOnUncaughtException) { System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b4caf68..584337a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -76,6 +76,8 @@ private[spark] object CallSite { private[spark] object Utils extends Logging { val random = new Random() + private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler + /** * Define a default value for driver memory here since this value is referenced across the code * base and nearly all files already use Utils.scala @@ -1274,7 +1276,7 @@ private[spark] object Utils extends Logging { block } catch { case e: ControlThrowable => throw e - case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t) + case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 38b082a..aa378c9 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher with CommandLineUtils { override def main(args: Array[String]) { - Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) + Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler) Utils.initDaemon(log) val conf = new SparkConf val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org