[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91671465
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
+}
+if (!taskRunner.isFinished) {
+  logWarning(s"Killed task ${taskRunner.taskId} is still running 
after $elapsedTimeMs ms")
+  if (takeThreadDump) {
+try {
+  val threads = Utils.getThreadDump()
+  threads.find(_.threadName == taskRunner.threadName).foreach 
{ thread =>
+logWarning(s"Thread dump from task 
${taskRunner.taskId}:\n${thread.stackTrace}")
+  }
+} catch {
+  case NonFatal(e) =>
+logWarning("Exception thrown while obtaining thread dump: 
", e)
+}
+  }
+}
+  }
+  if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs) {
--- End diff --

I thought about this and it seems like there are only two possibilities 
here:

1. We're running in local mode, in which case we don't actually want to 
throw an exception to kill the JVM and even if we did throw then it would keep 
on running because there's not an uncaught exception handler here.
2. We're running in a separate JVM, in which case any exception thrown in 
this thread and not caught will cause the JVM to exit. The only place in the 
body of this code that might actually throw unexpected exceptions is the 
taskThreadDump, which is already in a `try-catch` block to prevent exceptions 
from bubbling up.

Thus the only purpose of a finally block would be to detect whether it was 
reached via an exception patch and to log a warning to state that task kill 
progress will no longer be monitored. Basically, I'm not sure what the finally 
block is buying us in terms of actionable / useful logs and it's only going to 
add complexity because then we need to be careful to not throw from the finally 
block in case it was entered via an exception, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-08 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91497545
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
--- End diff --

Exactly - for testing.
If not unit tests, atleast functional tests should be done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91382001
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
--- End diff --

Added a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91379426
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -161,12 +163,7 @@ private[spark] class Executor(
* @param interruptThread whether to interrupt the task thread
*/
   def killAllTasks(interruptThread: Boolean) : Unit = {
-// kill all the running tasks
-for (taskRunner <- runningTasks.values().asScala) {
-  if (taskRunner != null) {
-taskRunner.kill(interruptThread)
-  }
-}
+runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = 
interruptThread))
--- End diff --

One corner-case that I just thought of: what should happen if the first 
call to `killTask` sets `interruptThread = false` and the second call sets it 
to true? If we used a loading cache naively then we would miss the second call. 
Instead, it might make sense to have the loading cache key be a tuple of `(tid, 
interrupt)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91364564
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -161,12 +163,7 @@ private[spark] class Executor(
* @param interruptThread whether to interrupt the task thread
*/
   def killAllTasks(interruptThread: Boolean) : Unit = {
-// kill all the running tasks
-for (taskRunner <- runningTasks.values().asScala) {
-  if (taskRunner != null) {
-taskRunner.kill(interruptThread)
-  }
-}
+runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = 
interruptThread))
--- End diff --

Or access TaskReapers via something like a guava LoadingCache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91347493
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
--- End diff --

I don't think that using `Clock` offers a huge improvement here unless 
we're going to use it to mock out time in unit tests, which I think may be 
difficult to do here given the structure of this code and difficultly in 
dependency injection here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91337695
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
--- End diff --

Yeah, I meant to use `wait()` (was prototyping this quickly late at night, 
hence this oversight). Will change now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91337585
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
+}
+if (!taskRunner.isFinished) {
+  logWarning(s"Killed task ${taskRunner.taskId} is still running 
after $elapsedTimeMs ms")
+  if (takeThreadDump) {
+try {
+  val threads = Utils.getThreadDump()
+  threads.find(_.threadName == taskRunner.threadName).foreach 
{ thread =>
+logWarning(s"Thread dump from task 
${taskRunner.taskId}:\n${thread.stackTrace}")
+  }
+} catch {
+  case NonFatal(e) =>
+logWarning("Exception thrown while obtaining thread dump: 
", e)
+}
+  }
+}
+  }
+  if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs) {
--- End diff --

Which finally block are you referring to here? The uncaught exception 
handler in the executor JVM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91259169
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
--- End diff --

Use Clock instead ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91259805
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
+}
+if (!taskRunner.isFinished) {
+  logWarning(s"Killed task ${taskRunner.taskId} is still running 
after $elapsedTimeMs ms")
+  if (takeThreadDump) {
+try {
+  val threads = Utils.getThreadDump()
+  threads.find(_.threadName == taskRunner.threadName).foreach 
{ thread =>
+logWarning(s"Thread dump from task 
${taskRunner.taskId}:\n${thread.stackTrace}")
+  }
+} catch {
+  case NonFatal(e) =>
+logWarning("Exception thrown while obtaining thread dump: 
", e)
+}
+  }
+}
+  }
+  if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs) {
--- End diff --

Why not move the logging into finally block so that even if there are 
exceptions in the method we have the status ?
(Makes sense to keep the throw here ofcourse).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91258960
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
+}
+if (!taskRunner.isFinished) {
+  logWarning(s"Killed task ${taskRunner.taskId} is still running 
after $elapsedTimeMs ms")
+  if (takeThreadDump) {
+try {
+  val threads = Utils.getThreadDump()
+  threads.find(_.threadName == taskRunner.threadName).foreach 
{ thread =>
+logWarning(s"Thread dump from task 
${taskRunner.taskId}:\n${thread.stackTrace}")
--- End diff --

This is expensive only to get to a single thread dump.
Why not keep track of thread id, dump only for the single thread (instead 
of all) and validate threadName is same as expected ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91257339
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
--- End diff --

Why is the sleep inside synchronized ?
Also, would be good to use wait/notify instead of sleep - so that the 
TaskReaper's are released proactively as soon as task finishes, so that number 
of threads in the cached pool does not go very high.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91235127
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
--- End diff --

My goal here was to let users set this to `-1` to disable killing of the 
executor JVM. I'll add a test to make sure that this flag actually behaves that 
way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91235062
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
--- End diff --

On the fence about documenting these publicly, but am willing to do so and 
appreciate naming suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91235005
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -161,12 +163,7 @@ private[spark] class Executor(
* @param interruptThread whether to interrupt the task thread
*/
   def killAllTasks(interruptThread: Boolean) : Unit = {
-// kill all the running tasks
-for (taskRunner <- runningTasks.values().asScala) {
-  if (taskRunner != null) {
-taskRunner.kill(interruptThread)
-  }
-}
+runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = 
interruptThread))
--- End diff --

A careful reviewer will notice that it's possible for `killTask` to be 
called twice for the same task, either via multiple calls to `killTask` here or 
via a call to `killTask` followed by a later `killAllTasks` call. I think that 
this should technically be okay as of the code in this first draft of this 
patch since having multiple TaskReapers for the same task should be fine, but I 
can also appreciate how this could cause resource exhaustion issues in the 
pathological case where killTask is spammed continuously. If we think it's 
important to avoid multiple reapers in this case then a simple solution would 
be to add a `synchronized` method on `TaskRunner` which submits a `TaskReaper` 
on the first kill request and is a no-op on subsequent requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91234781
  
--- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
---
@@ -209,6 +209,41 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
 assert(jobB.get() === 100)
   }
 
+  test("task reaper kills JVM if killed tasks keep running for too long") {
+val conf = new SparkConf().set("spark.task.killTimeout", "5s")
+sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+
+// Add a listener to release the semaphore once any tasks are launched.
+val sem = new Semaphore(0)
+sc.addSparkListener(new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart) {
+sem.release()
+  }
+})
+
+// jobA is the one to be cancelled.
+val jobA = Future {
+  sc.setJobGroup("jobA", "this is a job to be cancelled", 
interruptOnCancel = true)
+  sc.parallelize(1 to 1, 2).map { i =>
+while (true) { }
+  }.count()
+}
+
+// Block until both tasks of job A have started and cancel job A.
+sem.acquire(2)
+// Small delay to ensure tasks actually start executing the task body
+Thread.sleep(1000)
--- End diff --

This is slightly ugly but it's needed to avoid a race where this regression 
test can spuriously pass (and thereby fail to test anything) in case we cancel 
a task right after it has launched on the executor but before the UDF in the 
task has actually run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91234733
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +435,57 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(taskRunner: TaskRunner, interruptThread: 
Boolean) extends Runnable {
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+
+  while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
+taskRunner.kill(interruptThread = interruptThread)
+taskRunner.synchronized {
+  Thread.sleep(killPollingFrequencyMs)
+}
+if (!taskRunner.isFinished) {
+  logWarning(s"Killed task ${taskRunner.taskId} is still running 
after $elapsedTimeMs ms")
+  if (takeThreadDump) {
+try {
+  val threads = Utils.getThreadDump()
+  threads.find(_.threadName == taskRunner.threadName).foreach 
{ thread =>
+logWarning(s"Thread dump from task 
${taskRunner.taskId}:\n${thread.stackTrace}")
+  }
+} catch {
+  case NonFatal(e) =>
+logWarning("Exception thrown while obtaining thread dump: 
", e)
+}
+  }
+}
+  }
+  if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs) {
+if (isLocal) {
+  logError(s"Killed task ${taskRunner.taskId} could not be stopped 
within " +
--- End diff --

Even if we did throw an exception here, it wouldn't exit the JVM in local 
mode because we don't set an uncaught exception handler in local mode (see code 
further up in this file).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91234634
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -229,9 +230,11 @@ private[spark] class Executor(
   // ClosedByInterruptException during execBackend.statusUpdate which 
causes
   // Executor to crash
   Thread.interrupted()
+  notifyAll()
 }
 
 override def run(): Unit = {
+  Thread.currentThread().setName(threadName)
--- End diff --

Task ids should be unique so therefore this thread name should be unique. 
Hence, I don't think it's super important to reset the thread's name when 
returning it to this task thread pool because the thread will just be renamed 
as soon as it's recycled for a new task and if the task has already exited then 
it'll be clear from the thread state / context that this is just a completed 
task's thread that's been returned to the pool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91234685
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -192,13 +189,17 @@ private[spark] class Executor(
   serializedTask: ByteBuffer)
 extends Runnable {
 
+val threadName = s"Executor task launch worker for task $taskId"
--- End diff --

This naming scheme was intentionally chosen to match the pattern that we 
use for sorting threads in the executor thread dump page. I'll manually verify 
that this worked as expected there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-06 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/16189

[SPARK-18761][CORE][WIP] Introduce "task reaper" to oversee task killing in 
executors

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" 
because some tasks may not be interruptible or may not respond to their 
"killed" flags being set. If a significant fraction of a cluster's task slots 
are occupied by tasks that have been marked as killed but remain running then 
this can lead to a situation where new jobs and tasks are starved of resources 
that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism 
to executors. At a high-level, task killing now launches a new thread which 
attempts to kill the task and then watches the task and periodically checks 
whether it has been killed. The TaskReaper will periodically re-attempt to call 
`TaskRunner.kill()` and will log warnings if the task keeps running. I modified 
TaskRunner to rename its thread at the start of the task, allowing TaskReaper 
to take a thread dump and filter it in order to log stacktraces from tasks that 
we are waiting to finish. After a configurable timeout, if the task has not 
been killed then the TaskReaper will throw an exception to trigger executor JVM 
death, thereby forcibly freeing any resources consumed by the zombie tasks.

There are some aspects of the design that I'd like to think about a bit 
more, but I've opened this as `[WIP]` now in order to solicit early feedback. 
I'll comment on some of my thoughts directly on the diff.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark cancellation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16189


commit 2c28594b980845bda1d4db7ae866a91caaad4fff
Author: Josh Rosen 
Date:   2016-12-07T06:17:38Z

Add failing regression test.

commit a46f9c2436d533ff838674cb63e397d1007e34de
Author: Josh Rosen 
Date:   2016-12-07T06:18:43Z

Add TaskReaper to executor.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org