Repository: spark
Updated Branches:
  refs/heads/master 4000f128b -> 5142e5d4e


[SPARK-20217][CORE] Executor should not fail stage if killed task throws 
non-interrupted exception

## What changes were proposed in this pull request?

If tasks throw non-interrupted exceptions on kill (e.g. 
java.nio.channels.ClosedByInterruptException), their death is reported back as 
TaskFailed instead of TaskKilled. This causes stage failure in some cases.

This is reproducible as follows. Run the following, and then use 
SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will 
fail since we threw a RuntimeException instead of InterruptedException.

```
spark.range(100).repartition(100).foreach { i =>
  try {
    Thread.sleep(10000000)
  } catch {
    case t: InterruptedException =>
      throw new RuntimeException(t)
  }
}
```
Based on the code in TaskSetManager, I think this also affects kills of 
speculative tasks. However, since the number of speculated tasks is few, and 
usually you need to fail a task a few times before the stage is cancelled, it 
unlikely this would be noticed in production unless both speculation was 
enabled and the num allowed task failures was = 1.

We should probably unconditionally return TaskKilled instead of TaskFailed if 
the task was killed by the driver, regardless of the actual exception thrown.

## How was this patch tested?

Unit test. The test fails before the change in Executor.scala

cc JoshRosen

Author: Eric Liang <e...@databricks.com>

Closes #17531 from ericl/fix-task-interrupt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5142e5d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5142e5d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5142e5d4

Branch: refs/heads/master
Commit: 5142e5d4e09c7cb36cf1d792934a21c5305c6d42
Parents: 4000f12
Author: Eric Liang <e...@databricks.com>
Authored: Wed Apr 5 19:37:21 2017 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Apr 5 19:37:21 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +-
 core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 99b1608..83469c5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -432,7 +432,7 @@ private[spark] class Executor(
           setTaskFinishedAndClearInterruptStatus()
           execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
 
-        case _: InterruptedException if task.reasonIfKilled.isDefined =>
+        case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
           val killReason = task.reasonIfKilled.getOrElse("unknown reason")
           logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
           setTaskFinishedAndClearInterruptStatus()

http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 2c94755..735f445 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -572,7 +572,13 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         // first attempt will hang
         if (!SparkContextSuite.isTaskStarted) {
           SparkContextSuite.isTaskStarted = true
-          Thread.sleep(9999999)
+          try {
+            Thread.sleep(9999999)
+          } catch {
+            case t: Throwable =>
+              // SPARK-20217 should not fail stage if task throws 
non-interrupted exception
+              throw new RuntimeException("killed")
+          }
         }
         // second attempt succeeds immediately
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to