Repository: spark
Updated Branches:
  refs/heads/branch-2.0 666eb0118 -> 80b49be91


[SPARK-14915][CORE] Don't re-queue a task if another attempt has already 
succeeded

## What changes were proposed in this pull request?

Don't re-queue a task if another attempt has already succeeded.  This currently 
happens when a speculative task is denied from committing the result due to 
another copy of the task already having succeeded.

## How was this patch tested?

I'm running a job which has a fair bit of skew in the processing time across 
the tasks for speculation to trigger in the last quarter (default settings), 
causing many commit denied exceptions to be thrown.  Previously, these tasks 
were then being retried over and over again until the stage possibly completes 
(despite using compute resources on these superfluous tasks).  With this change 
(applied to the 1.6 branch), they no longer retry and the stage completes 
successfully without these extra task attempts.

Author: Jason Moore <jasonmoor...@outlook.com>

Closes #12751 from jasonmoore2k/SPARK-14915.

(cherry picked from commit 77361a433adce109c2b752b11dda25b56eca0352)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80b49be9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80b49be9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80b49be9

Branch: refs/heads/branch-2.0
Commit: 80b49be91a38e7d007f28996861e807e01d69d27
Parents: 666eb01
Author: Jason Moore <jasonmoor...@outlook.com>
Authored: Thu May 5 11:02:35 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu May 5 11:02:44 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSetManager.scala      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/80b49be9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index bfa1e86..08d33f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -716,7 +716,16 @@ private[spark] class TaskSetManager(
     failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
       put(info.executorId, clock.getTimeMillis())
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, 
info)
-    addPendingTask(index)
+
+    if (successful(index)) {
+      logInfo(
+        s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
+        "but another instance of the task has already succeeded, " +
+        "so not re-queuing the task to be re-executed.")
+    } else {
+      addPendingTask(index)
+    }
+
     if (!isZombie && state != TaskState.KILLED
         && reason.isInstanceOf[TaskFailedReason]
         && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to