[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91671465 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) +} +if (!taskRunner.isFinished) { + logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms") + if (takeThreadDump) { +try { + val threads = Utils.getThreadDump() + threads.find(_.threadName == taskRunner.threadName).foreach { thread => +logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}") + } +} catch { + case NonFatal(e) => +logWarning("Exception thrown while obtaining thread dump: ", e) +} + } +} + } + if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs) { --- End diff -- I thought about this and it seems like there are only two possibilities here: 1. We're running in local mode, in which case we don't actually want to throw an exception to kill the JVM and even if we did throw then it would keep on running because there's not an uncaught exception handler here. 2. We're running in a separate JVM, in which case any exception thrown in this thread and not caught will cause the JVM to exit. The only place in the body of this code that might actually throw unexpected exceptions is the taskThreadDump, which is already in a `try-catch` block to prevent exceptions from bubbling up. Thus the only purpose of a finally block would be to detect whether it was reached via an exception patch and to log a warning to state that task kill progress will no longer be monitored. Basically, I'm not sure what the finally block is buying us in terms of actionable / useful logs and it's only going to add complexity because then we need to be careful to not throw from the finally block in case it was entered via an exception, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91497545 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs --- End diff -- Exactly - for testing. If not unit tests, atleast functional tests should be done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91382001 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") --- End diff -- Added a test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91379426 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -161,12 +163,7 @@ private[spark] class Executor( * @param interruptThread whether to interrupt the task thread */ def killAllTasks(interruptThread: Boolean) : Unit = { -// kill all the running tasks -for (taskRunner <- runningTasks.values().asScala) { - if (taskRunner != null) { -taskRunner.kill(interruptThread) - } -} +runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) --- End diff -- One corner-case that I just thought of: what should happen if the first call to `killTask` sets `interruptThread = false` and the second call sets it to true? If we used a loading cache naively then we would miss the second call. Instead, it might make sense to have the loading cache key be a tuple of `(tid, interrupt)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91364564 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -161,12 +163,7 @@ private[spark] class Executor( * @param interruptThread whether to interrupt the task thread */ def killAllTasks(interruptThread: Boolean) : Unit = { -// kill all the running tasks -for (taskRunner <- runningTasks.values().asScala) { - if (taskRunner != null) { -taskRunner.kill(interruptThread) - } -} +runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) --- End diff -- Or access TaskReapers via something like a guava LoadingCache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91347493 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs --- End diff -- I don't think that using `Clock` offers a huge improvement here unless we're going to use it to mock out time in unit tests, which I think may be difficult to do here given the structure of this code and difficultly in dependency injection here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91337695 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) --- End diff -- Yeah, I meant to use `wait()` (was prototyping this quickly late at night, hence this oversight). Will change now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91337585 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) +} +if (!taskRunner.isFinished) { + logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms") + if (takeThreadDump) { +try { + val threads = Utils.getThreadDump() + threads.find(_.threadName == taskRunner.threadName).foreach { thread => +logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}") + } +} catch { + case NonFatal(e) => +logWarning("Exception thrown while obtaining thread dump: ", e) +} + } +} + } + if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs) { --- End diff -- Which finally block are you referring to here? The uncaught exception handler in the executor JVM? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91259169 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs --- End diff -- Use Clock instead ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91259805 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) +} +if (!taskRunner.isFinished) { + logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms") + if (takeThreadDump) { +try { + val threads = Utils.getThreadDump() + threads.find(_.threadName == taskRunner.threadName).foreach { thread => +logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}") + } +} catch { + case NonFatal(e) => +logWarning("Exception thrown while obtaining thread dump: ", e) +} + } +} + } + if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs) { --- End diff -- Why not move the logging into finally block so that even if there are exceptions in the method we have the status ? (Makes sense to keep the throw here ofcourse). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91258960 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) +} +if (!taskRunner.isFinished) { + logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms") + if (takeThreadDump) { +try { + val threads = Utils.getThreadDump() + threads.find(_.threadName == taskRunner.threadName).foreach { thread => +logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}") --- End diff -- This is expensive only to get to a single thread dump. Why not keep track of thread id, dump only for the single thread (instead of all) and validate threadName is same as expected ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91257339 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) --- End diff -- Why is the sleep inside synchronized ? Also, would be good to use wait/notify instead of sleep - so that the TaskReaper's are released proactively as soon as task finishes, so that number of threads in the cached pool does not go very high. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91235127 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") --- End diff -- My goal here was to let users set this to `-1` to disable killing of the executor JVM. I'll add a test to make sure that this flag actually behaves that way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91235062 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") --- End diff -- On the fence about documenting these publicly, but am willing to do so and appreciate naming suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91235005 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -161,12 +163,7 @@ private[spark] class Executor( * @param interruptThread whether to interrupt the task thread */ def killAllTasks(interruptThread: Boolean) : Unit = { -// kill all the running tasks -for (taskRunner <- runningTasks.values().asScala) { - if (taskRunner != null) { -taskRunner.kill(interruptThread) - } -} +runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) --- End diff -- A careful reviewer will notice that it's possible for `killTask` to be called twice for the same task, either via multiple calls to `killTask` here or via a call to `killTask` followed by a later `killAllTasks` call. I think that this should technically be okay as of the code in this first draft of this patch since having multiple TaskReapers for the same task should be fine, but I can also appreciate how this could cause resource exhaustion issues in the pathological case where killTask is spammed continuously. If we think it's important to avoid multiple reapers in this case then a simple solution would be to add a `synchronized` method on `TaskRunner` which submits a `TaskReaper` on the first kill request and is a no-op on subsequent requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91234781 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -209,6 +209,41 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft assert(jobB.get() === 100) } + test("task reaper kills JVM if killed tasks keep running for too long") { +val conf = new SparkConf().set("spark.task.killTimeout", "5s") +sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) + +// Add a listener to release the semaphore once any tasks are launched. +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { +sem.release() + } +}) + +// jobA is the one to be cancelled. +val jobA = Future { + sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true) + sc.parallelize(1 to 1, 2).map { i => +while (true) { } + }.count() +} + +// Block until both tasks of job A have started and cancel job A. +sem.acquire(2) +// Small delay to ensure tasks actually start executing the task body +Thread.sleep(1000) --- End diff -- This is slightly ugly but it's needed to avoid a race where this regression test can spuriously pass (and thereby fail to test anything) in case we cancel a task right after it has launched on the executor but before the UDF in the task has actually run. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91234733 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +435,57 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable { + +private[this] val killPollingFrequencyMs: Long = + conf.getTimeAsMs("spark.task.killPollingFrequency", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.threadDumpKilledTasks", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + + while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) { +taskRunner.kill(interruptThread = interruptThread) +taskRunner.synchronized { + Thread.sleep(killPollingFrequencyMs) +} +if (!taskRunner.isFinished) { + logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms") + if (takeThreadDump) { +try { + val threads = Utils.getThreadDump() + threads.find(_.threadName == taskRunner.threadName).foreach { thread => +logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}") + } +} catch { + case NonFatal(e) => +logWarning("Exception thrown while obtaining thread dump: ", e) +} + } +} + } + if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs) { +if (isLocal) { + logError(s"Killed task ${taskRunner.taskId} could not be stopped within " + --- End diff -- Even if we did throw an exception here, it wouldn't exit the JVM in local mode because we don't set an uncaught exception handler in local mode (see code further up in this file). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91234634 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -229,9 +230,11 @@ private[spark] class Executor( // ClosedByInterruptException during execBackend.statusUpdate which causes // Executor to crash Thread.interrupted() + notifyAll() } override def run(): Unit = { + Thread.currentThread().setName(threadName) --- End diff -- Task ids should be unique so therefore this thread name should be unique. Hence, I don't think it's super important to reset the thread's name when returning it to this task thread pool because the thread will just be renamed as soon as it's recycled for a new task and if the task has already exited then it'll be clear from the thread state / context that this is just a completed task's thread that's been returned to the pool. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91234685 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -192,13 +189,17 @@ private[spark] class Executor( serializedTask: ByteBuffer) extends Runnable { +val threadName = s"Executor task launch worker for task $taskId" --- End diff -- This naming scheme was intentionally chosen to match the pattern that we use for sorting threads in the executor thread dump page. I'll manually verify that this worked as expected there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/16189 [SPARK-18761][CORE][WIP] Introduce "task reaper" to oversee task killing in executors ## What changes were proposed in this pull request? Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from tasks that we are waiting to finish. After a configurable timeout, if the task has not been killed then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks. There are some aspects of the design that I'd like to think about a bit more, but I've opened this as `[WIP]` now in order to solicit early feedback. I'll comment on some of my thoughts directly on the diff. ## How was this patch tested? Tested via a new test case in `JobCancellationSuite`, plus manual testing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark cancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16189 commit 2c28594b980845bda1d4db7ae866a91caaad4fff Author: Josh Rosen Date: 2016-12-07T06:17:38Z Add failing regression test. commit a46f9c2436d533ff838674cb63e397d1007e34de Author: Josh Rosen Date: 2016-12-07T06:18:43Z Add TaskReaper to executor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org