Repository: spark
Updated Branches:
  refs/heads/master 5ae3516bf -> 433d9eb61


[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for 
already failed tasks

## What changes were proposed in this pull request?

Previously it was possible for there to be a race between a task failure and 
committing the output of a task. For example, the driver may mark a task 
attempt as failed due to an executor heartbeat timeout (possibly due to GC), 
but the task attempt actually ends up coordinating with the 
OutputCommitCoordinator once the executor recovers and committing its result. 
This will lead to any retry attempt failing because the task result has already 
been committed despite the original attempt failing.

This ensures that any previously failed task attempts cannot enter the commit 
protocol.

## How was this patch tested?

Added a unit test

Author: Patrick Woody <pwo...@palantir.com>

Closes #16959 from pwoody/pw/recordFailuresForCommitter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/433d9eb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/433d9eb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/433d9eb6

Branch: refs/heads/master
Commit: 433d9eb6151a547af967cc1ac983a789bed60704
Parents: 5ae3516
Author: Patrick Woody <pwo...@palantir.com>
Authored: Thu Mar 2 15:55:32 2017 -0800
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Thu Mar 2 15:55:32 2017 -0800

----------------------------------------------------------------------
 .../scheduler/OutputCommitCoordinator.scala     | 59 ++++++++++++--------
 .../OutputCommitCoordinatorSuite.scala          | 11 ++++
 2 files changed, 46 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/433d9eb6/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 08d220b..83d87b5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private type StageId = Int
   private type PartitionId = Int
   private type TaskAttemptNumber = Int
-
   private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+  private case class StageState(numPartitions: Int) {
+    val authorizedCommitters = 
Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
+    val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+  }
 
   /**
-   * Map from active stages's id => partition id => task attempt with 
exclusive lock on committing
-   * output for that partition.
+   * Map from active stages's id => authorized task attempts for each 
partition id, which hold an
+   * exclusive lock on committing task output for that partition, as well as 
any known failed
+   * attempts in the stage.
    *
    * Entries are added to the top-level map when stages start and are removed 
they finish
    * (either successfully or unsuccessfully).
    *
    * Access to this map should be guarded by synchronizing on the 
OutputCommitCoordinator instance.
    */
-  private val authorizedCommittersByStage = mutable.Map[StageId, 
Array[TaskAttemptNumber]]()
+  private val stageStates = mutable.Map[StageId, StageState]()
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
    */
   def isEmpty: Boolean = {
-    authorizedCommittersByStage.isEmpty
+    stageStates.isEmpty
   }
 
   /**
@@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
    * @param maxPartitionId the maximum partition id that could appear in this 
stage's tasks (i.e.
    *                       the maximum possible value of 
`context.partitionId`).
    */
-  private[scheduler] def stageStart(
-      stage: StageId,
-      maxPartitionId: Int): Unit = {
-    val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-    java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
-    synchronized {
-      authorizedCommittersByStage(stage) = arr
-    }
+  private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit 
= synchronized {
+    stageStates(stage) = new StageState(maxPartitionId + 1)
   }
 
   // Called by DAGScheduler
   private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
-    authorizedCommittersByStage.remove(stage)
+    stageStates.remove(stage)
   }
 
   // Called by DAGScheduler
@@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
       partition: PartitionId,
       attemptNumber: TaskAttemptNumber,
       reason: TaskEndReason): Unit = synchronized {
-    val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
+    val stageState = stageStates.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
       return
     })
@@ -137,10 +135,12 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
         logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
           s"attempt: $attemptNumber")
       case otherReason =>
-        if (authorizedCommitters(partition) == attemptNumber) {
+        // Mark the attempt as failed to blacklist from future commit protocol
+        stageState.failures.getOrElseUpdate(partition, mutable.Set()) += 
attemptNumber
+        if (stageState.authorizedCommitters(partition) == attemptNumber) {
           logDebug(s"Authorized committer (attemptNumber=$attemptNumber, 
stage=$stage, " +
             s"partition=$partition) failed; clearing lock")
-          authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+          stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
         }
     }
   }
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
     if (isDriver) {
       coordinatorRef.foreach(_ send StopCoordinator)
       coordinatorRef = None
-      authorizedCommittersByStage.clear()
+      stageStates.clear()
     }
   }
 
@@ -158,13 +158,17 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
       stage: StageId,
       partition: PartitionId,
       attemptNumber: TaskAttemptNumber): Boolean = synchronized {
-    authorizedCommittersByStage.get(stage) match {
-      case Some(authorizedCommitters) =>
-        authorizedCommitters(partition) match {
+    stageStates.get(stage) match {
+      case Some(state) if attemptFailed(state, partition, attemptNumber) =>
+        logInfo(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage," +
+          s" partition=$partition as task attempt $attemptNumber has already 
failed.")
+        false
+      case Some(state) =>
+        state.authorizedCommitters(partition) match {
           case NO_AUTHORIZED_COMMITTER =>
             logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
               s"partition=$partition")
-            authorizedCommitters(partition) = attemptNumber
+            state.authorizedCommitters(partition) = attemptNumber
             true
           case existingCommitter =>
             // Coordinator should be idempotent when receiving 
AskPermissionToCommit.
@@ -181,11 +185,18 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
             }
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +
-          s"partition $partition to commit")
+        logDebug(s"Stage $stage has completed, so not allowing" +
+          s" attempt number $attemptNumber of partition $partition to commit")
         false
     }
   }
+
+  private def attemptFailed(
+      stageState: StageState,
+      partition: PartitionId,
+      attempt: TaskAttemptNumber): Boolean = synchronized {
+    stageState.failures.get(partition).exists(_.contains(attempt))
+  }
 }
 
 private[spark] object OutputCommitCoordinator {

http://git-wip-us.apache.org/repos/asf/spark/blob/433d9eb6/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 0c362b8..83ed127 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -195,6 +195,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     sc.runJob(rdd, 
OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
        0 until rdd.partitions.size)
   }
+
+  test("SPARK-19631: Do not allow failed attempts to be authorized for 
committing") {
+    val stage: Int = 1
+    val partition: Int = 1
+    val failedAttempt: Int = 0
+    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+    outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = 
failedAttempt,
+      reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
+    assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 
1))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to