Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7bfefc928 -> 751b00820


[SPARK-24589][CORE] Correctly identify tasks in output commit coordinator.

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909cd498b67b121fa920ceee7631c652dac38)
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/751b0082
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/751b0082
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/751b0082

Branch: refs/heads/branch-2.2
Commit: 751b008204a847e26d79b1996ce3f3dbe96a5acf
Parents: 7bfefc9
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Jun 21 13:25:15 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Jun 21 14:10:16 2018 -0500

----------------------------------------------------------------------
 .../spark/mapred/SparkHadoopMapRedUtil.scala    |   8 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  23 ++--
 .../scheduler/OutputCommitCoordinator.scala     | 128 ++++++++++---------
 .../OutputCommitCoordinatorSuite.scala          | 116 ++++++++++++-----
 4 files changed, 173 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/751b0082/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 764735d..db8aff9 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging {
 
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
-        val taskAttemptNumber = TaskContext.get().attemptNumber()
-        val stageId = TaskContext.get().stageId()
-        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, 
taskAttemptNumber)
+        val ctx = TaskContext.get()
+        val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), 
ctx.stageAttemptNumber(),
+          splitId, ctx.attemptNumber())
 
         if (canCommit) {
           performCommit()
@@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new 
attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, stageId, splitId, 
taskAttemptNumber)
+          throw new CommitDeniedException(message, ctx.stageId(), splitId, 
ctx.attemptNumber())
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the 
commit coordination

http://git-wip-us.apache.org/repos/asf/spark/blob/751b0082/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 87e407f..099bc2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1151,6 +1151,7 @@ class DAGScheduler(
 
     outputCommitCoordinator.taskCompleted(
       stageId,
+      task.stageAttemptId,
       task.partitionId,
       event.taskInfo.attemptNumber, // this is a task attempt number
       event.reason)
@@ -1309,23 +1310,24 @@ class DAGScheduler(
             s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
             s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
         } else {
+          failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
+          val shouldAbortStage =
+            failedStage.fetchFailedAttemptIds.size >= 
maxConsecutiveStageAttempts ||
+            disallowStageRetryForTest
+
           // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
           // multiple tasks running concurrently on different executors). In 
that case, it is
           // possible the fetch failure has already been handled by the 
scheduler.
           if (runningStages.contains(failedStage)) {
             logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
               s"due to a fetch failure from $mapStage (${mapStage.name})")
-            markStageAsFinished(failedStage, Some(failureMessage))
+            markStageAsFinished(failedStage, errorMessage = 
Some(failureMessage),
+              willRetry = !shouldAbortStage)
           } else {
             logDebug(s"Received fetch failure from $task, but its from 
$failedStage which is no " +
               s"longer running")
           }
 
-          failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
-          val shouldAbortStage =
-            failedStage.fetchFailedAttemptIds.size >= 
maxConsecutiveStageAttempts ||
-            disallowStageRetryForTest
-
           if (shouldAbortStage) {
             val abortMessage = if (disallowStageRetryForTest) {
               "Fetch failure will not retry stage due to testing config"
@@ -1471,7 +1473,10 @@ class DAGScheduler(
   /**
    * Marks a stage as finished and removes it from the list of running stages.
    */
-  private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = 
None): Unit = {
+  private def markStageAsFinished(
+      stage: Stage,
+      errorMessage: Option[String] = None,
+      willRetry: Boolean = false): Unit = {
     val serviceTime = stage.latestInfo.submissionTime match {
       case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
       case _ => "Unknown"
@@ -1490,7 +1495,9 @@ class DAGScheduler(
       logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to 
${errorMessage.get}")
     }
 
-    outputCommitCoordinator.stageEnd(stage.id)
+    if (!willRetry) {
+      outputCommitCoordinator.stageEnd(stage.id)
+    }
     listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
     runningStages -= stage
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/751b0082/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 83d87b5..b382d62 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -27,7 +27,11 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
 private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, partition: Int, 
attemptNumber: Int)
+private case class AskPermissionToCommitOutput(
+    stage: Int,
+    stageAttempt: Int,
+    partition: Int,
+    attemptNumber: Int)
 
 /**
  * Authority that decides whether tasks can commit output to HDFS. Uses a 
"first committer wins"
@@ -45,13 +49,15 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   // Initialized by SparkEnv
   var coordinatorRef: Option[RpcEndpointRef] = None
 
-  private type StageId = Int
-  private type PartitionId = Int
-  private type TaskAttemptNumber = Int
-  private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+  // Class used to identify a committer. The task ID for a committer is 
implicitly defined by
+  // the partition being processed, but the coordinator needs to keep track of 
both the stage
+  // attempt and the task attempt, because in some situations the same task 
may be running
+  // concurrently in two different attempts of the same stage.
+  private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int)
+
   private case class StageState(numPartitions: Int) {
-    val authorizedCommitters = 
Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
-    val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+    val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null)
+    val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]()
   }
 
   /**
@@ -64,7 +70,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
    *
    * Access to this map should be guarded by synchronizing on the 
OutputCommitCoordinator instance.
    */
-  private val stageStates = mutable.Map[StageId, StageState]()
+  private val stageStates = mutable.Map[Int, StageState]()
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
@@ -87,10 +93,11 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
    * @return true if this task is authorized to commit, false otherwise
    */
   def canCommit(
-      stage: StageId,
-      partition: PartitionId,
-      attemptNumber: TaskAttemptNumber): Boolean = {
-    val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
+      stage: Int,
+      stageAttempt: Int,
+      partition: Int,
+      attemptNumber: Int): Boolean = {
+    val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, 
attemptNumber)
     coordinatorRef match {
       case Some(endpointRef) =>
         ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
@@ -103,26 +110,35 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   }
 
   /**
-   * Called by the DAGScheduler when a stage starts.
+   * Called by the DAGScheduler when a stage starts. Initializes the stage's 
state if it hasn't
+   * yet been initialized.
    *
    * @param stage the stage id.
    * @param maxPartitionId the maximum partition id that could appear in this 
stage's tasks (i.e.
    *                       the maximum possible value of 
`context.partitionId`).
    */
-  private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit 
= synchronized {
-    stageStates(stage) = new StageState(maxPartitionId + 1)
+  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = 
synchronized {
+    stageStates.get(stage) match {
+      case Some(state) =>
+        require(state.authorizedCommitters.length == maxPartitionId + 1)
+        logInfo(s"Reusing state from previous attempt of stage $stage.")
+
+      case _ =>
+        stageStates(stage) = new StageState(maxPartitionId + 1)
+    }
   }
 
   // Called by DAGScheduler
-  private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
+  private[scheduler] def stageEnd(stage: Int): Unit = synchronized {
     stageStates.remove(stage)
   }
 
   // Called by DAGScheduler
   private[scheduler] def taskCompleted(
-      stage: StageId,
-      partition: PartitionId,
-      attemptNumber: TaskAttemptNumber,
+      stage: Int,
+      stageAttempt: Int,
+      partition: Int,
+      attemptNumber: Int,
       reason: TaskEndReason): Unit = synchronized {
     val stageState = stageStates.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
@@ -131,16 +147,17 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
     reason match {
       case Success =>
       // The task output has been committed successfully
-      case denied: TaskCommitDenied =>
-        logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
-          s"attempt: $attemptNumber")
-      case otherReason =>
+      case _: TaskCommitDenied =>
+        logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
+          s"partition: $partition, attempt: $attemptNumber")
+      case _ =>
         // Mark the attempt as failed to blacklist from future commit protocol
-        stageState.failures.getOrElseUpdate(partition, mutable.Set()) += 
attemptNumber
-        if (stageState.authorizedCommitters(partition) == attemptNumber) {
+        val taskId = TaskIdentifier(stageAttempt, attemptNumber)
+        stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
+        if (stageState.authorizedCommitters(partition) == taskId) {
           logDebug(s"Authorized committer (attemptNumber=$attemptNumber, 
stage=$stage, " +
             s"partition=$partition) failed; clearing lock")
-          stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+          stageState.authorizedCommitters(partition) = null
         }
     }
   }
@@ -155,47 +172,41 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 
   // Marked private[scheduler] instead of private so this can be mocked in 
tests
   private[scheduler] def handleAskPermissionToCommit(
-      stage: StageId,
-      partition: PartitionId,
-      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
+      stage: Int,
+      stageAttempt: Int,
+      partition: Int,
+      attemptNumber: Int): Boolean = synchronized {
     stageStates.get(stage) match {
-      case Some(state) if attemptFailed(state, partition, attemptNumber) =>
-        logInfo(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage," +
-          s" partition=$partition as task attempt $attemptNumber has already 
failed.")
+      case Some(state) if attemptFailed(state, stageAttempt, partition, 
attemptNumber) =>
+        logInfo(s"Commit denied for stage=$stage.$stageAttempt, 
partition=$partition: " +
+          s"task attempt $attemptNumber already marked as failed.")
         false
       case Some(state) =>
-        state.authorizedCommitters(partition) match {
-          case NO_AUTHORIZED_COMMITTER =>
-            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
-              s"partition=$partition")
-            state.authorizedCommitters(partition) = attemptNumber
-            true
-          case existingCommitter =>
-            // Coordinator should be idempotent when receiving 
AskPermissionToCommit.
-            if (existingCommitter == attemptNumber) {
-              logWarning(s"Authorizing duplicate request to commit for " +
-                s"attemptNumber=$attemptNumber to commit for stage=$stage," +
-                s" partition=$partition; existingCommitter = 
$existingCommitter." +
-                s" This can indicate dropped network traffic.")
-              true
-            } else {
-              logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
-                s"partition=$partition; existingCommitter = 
$existingCommitter")
-              false
-            }
+        val existing = state.authorizedCommitters(partition)
+        if (existing == null) {
+          logDebug(s"Commit allowed for stage=$stage.$stageAttempt, 
partition=$partition, " +
+            s"task attempt $attemptNumber")
+          state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, 
attemptNumber)
+          true
+        } else {
+          logDebug(s"Commit denied for stage=$stage.$stageAttempt, 
partition=$partition: " +
+            s"already committed by $existing")
+          false
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing" +
-          s" attempt number $attemptNumber of partition $partition to commit")
+        logDebug(s"Commit denied for stage=$stage.$stageAttempt, 
partition=$partition: " +
+          "stage already marked as completed.")
         false
     }
   }
 
   private def attemptFailed(
       stageState: StageState,
-      partition: PartitionId,
-      attempt: TaskAttemptNumber): Boolean = synchronized {
-    stageState.failures.get(partition).exists(_.contains(attempt))
+      stageAttempt: Int,
+      partition: Int,
+      attempt: Int): Boolean = synchronized {
+    val failInfo = TaskIdentifier(stageAttempt, attempt)
+    stageState.failures.get(partition).exists(_.contains(failInfo))
   }
 }
 
@@ -215,9 +226,10 @@ private[spark] object OutputCommitCoordinator {
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-      case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
+      case AskPermissionToCommitOutput(stage, stageAttempt, partition, 
attemptNumber) =>
         context.reply(
-          outputCommitCoordinator.handleAskPermissionToCommit(stage, 
partition, attemptNumber))
+          outputCommitCoordinator.handleAskPermissionToCommit(stage, 
stageAttempt, partition,
+            attemptNumber))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/751b0082/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e51e6a0..742b841 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark._
 import org.apache.spark.internal.io.SparkHadoopWriter
 import org.apache.spark.rdd.{FakeOutputCommitter, RDD}
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
@@ -151,7 +152,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
   test("Job should not complete if all commits are denied") {
     // Create a mock OutputCommitCoordinator that denies all attempts to commit
     doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit(
-      Matchers.any(), Matchers.any(), Matchers.any())
+      Matchers.any(), Matchers.any(), Matchers.any(), Matchers.any())
     val rdd: RDD[Int] = sc.parallelize(Seq(1), 1)
     def resultHandler(x: Int, y: Unit): Unit = {}
     val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, 
Unit](rdd,
@@ -167,45 +168,106 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   test("Only authorized committer failures can clear the authorized committer 
lock (SPARK-6614)") {
     val stage: Int = 1
+    val stageAttempt: Int = 1
     val partition: Int = 2
     val authorizedCommitter: Int = 3
     val nonAuthorizedCommitter: Int = 100
     outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)
 
-    assert(outputCommitCoordinator.canCommit(stage, partition, 
authorizedCommitter))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter))
+    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, 
authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+      nonAuthorizedCommitter))
     // The non-authorized committer fails
-    outputCommitCoordinator.taskCompleted(
-      stage, partition, attemptNumber = nonAuthorizedCommitter, reason = 
TaskKilled("test"))
+    outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+      attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test"))
     // New tasks should still not be able to commit because the authorized 
committer has not failed
-    assert(
-      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 1))
+    assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+      nonAuthorizedCommitter + 1))
     // The authorized committer now fails, clearing the lock
-    outputCommitCoordinator.taskCompleted(
-      stage, partition, attemptNumber = authorizedCommitter, reason = 
TaskKilled("test"))
+    outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+      attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
     // A new task should now be allowed to become the authorized committer
-    assert(
-      outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 2))
+    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+      nonAuthorizedCommitter + 2))
     // There can only be one authorized committer
-    assert(
-      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 3))
-  }
-
-  test("Duplicate calls to canCommit from the authorized committer gets 
idempotent responses.") {
-    val rdd = sc.parallelize(Seq(1), 1)
-    sc.runJob(rdd, 
OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
-       0 until rdd.partitions.size)
+    assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+      nonAuthorizedCommitter + 3))
   }
 
   test("SPARK-19631: Do not allow failed attempts to be authorized for 
committing") {
     val stage: Int = 1
+    val stageAttempt: Int = 1
     val partition: Int = 1
     val failedAttempt: Int = 0
     outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
-    outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = 
failedAttempt,
+    outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+      attemptNumber = failedAttempt,
       reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
-    assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 
1))
+    assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, 
failedAttempt))
+    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, 
failedAttempt + 1))
+  }
+
+  test("SPARK-24589: Differentiate tasks from different stage attempts") {
+    var stage = 1
+    val taskAttempt = 1
+    val partition = 1
+
+    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+    assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
+    assert(!outputCommitCoordinator.canCommit(stage, 2, partition, 
taskAttempt))
+
+    // Fail the task in the first attempt, the task in the second attempt 
should succeed.
+    stage += 1
+    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+    outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
+      ExecutorLostFailure("0", exitCausedByApp = true, None))
+    assert(!outputCommitCoordinator.canCommit(stage, 1, partition, 
taskAttempt))
+    assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
+
+    // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt 
cannot commit,
+    // then fail the 1st attempt and make sure the 4th one can commit again.
+    stage += 1
+    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+    assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
+    outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt,
+      ExecutorLostFailure("0", exitCausedByApp = true, None))
+    assert(!outputCommitCoordinator.canCommit(stage, 3, partition, 
taskAttempt))
+    outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
+      ExecutorLostFailure("0", exitCausedByApp = true, None))
+    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
+  }
+
+  test("SPARK-24589: Make sure stage state is cleaned up") {
+    // Normal application without stage failures.
+    sc.parallelize(1 to 100, 100)
+      .map { i => (i % 10, i) }
+      .reduceByKey(_ + _)
+      .collect()
+
+    assert(sc.dagScheduler.outputCommitCoordinator.isEmpty)
+
+    // Force failures in a few tasks so that a stage is retried. Collect the 
ID of the failing
+    // stage so that we can check the state of the output committer.
+    val retriedStage = sc.parallelize(1 to 100, 10)
+      .map { i => (i % 10, i) }
+      .reduceByKey { case (_, _) =>
+        val ctx = TaskContext.get()
+        if (ctx.stageAttemptNumber() == 0) {
+          throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 1, 1, 1,
+            new Exception("Failure for test."))
+        } else {
+          ctx.stageId()
+        }
+      }
+      .collect()
+      .map { case (k, v) => v }
+      .toSet
+
+    assert(retriedStage.size === 1)
+    assert(sc.dagScheduler.outputCommitCoordinator.isEmpty)
+    verify(sc.env.outputCommitCoordinator, times(2))
+      .stageStart(Matchers.eq(retriedStage.head), Matchers.any())
+    
verify(sc.env.outputCommitCoordinator).stageEnd(Matchers.eq(retriedStage.head))
   }
 }
 
@@ -239,16 +301,6 @@ private case class OutputCommitFunctions(tempDirPath: 
String) {
       if (ctx.attemptNumber == 0) failingOutputCommitter else 
successfulOutputCommitter)
   }
 
-  // Receiver should be idempotent for AskPermissionToCommitOutput
-  def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
-    val ctx = TaskContext.get()
-    val canCommit1 = SparkEnv.get.outputCommitCoordinator
-      .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
-    val canCommit2 = SparkEnv.get.outputCommitCoordinator
-      .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
-    assert(canCommit1 && canCommit2)
-  }
-
   private def runCommitWithProvidedCommitter(
       ctx: TaskContext,
       iter: Iterator[Int],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to