Repository: spark
Updated Branches:
  refs/heads/branch-1.5 997be78c3 -> 2bbcbc659


[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in 
OutputCommitCoordinator

When speculative execution is enabled, consider a scenario where the authorized 
committer of a particular output partition fails during the 
OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is 
supposed to release that committer's exclusive lock on committing once that 
task fails. However, due to a unit mismatch (we used task attempt number in one 
place and task attempt id in another) the lock will not be released, causing 
Spark to go into an infinite retry loop.

This bug was masked by the fact that the OutputCommitCoordinator does not have 
enough end-to-end tests (the current tests use many mocks). Other factors 
contributing to this bug are the fact that we have many similarly-named 
identifiers that have different semantics but the same data types (e.g. 
attemptNumber and taskAttemptId, with inconsistent variable naming which makes 
them difficult to distinguish).

This patch adds a regression test and fixes this bug by always using task 
attempt numbers throughout this code.

Author: Josh Rosen <joshro...@databricks.com>

Closes #8544 from JoshRosen/SPARK-10381.

(cherry picked from commit 38700ea40cb1dd0805cc926a9e629f93c99527ad)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bbcbc65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bbcbc65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bbcbc65

Branch: refs/heads/branch-1.5
Commit: 2bbcbc65917fd5f0eba23d0cb000eb9c26c0165b
Parents: 997be78
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Sep 15 17:11:21 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Sep 15 17:20:58 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  3 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  7 +-
 .../spark/executor/CommitDeniedException.scala  |  4 +-
 .../spark/mapred/SparkHadoopMapRedUtil.scala    | 20 ++----
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 +-
 .../scheduler/OutputCommitCoordinator.scala     | 48 +++++++-------
 .../org/apache/spark/scheduler/TaskInfo.scala   |  7 +-
 .../spark/status/api/v1/AllStagesResource.scala |  2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  4 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 ...utputCommitCoordinatorIntegrationSuite.scala | 68 ++++++++++++++++++++
 .../OutputCommitCoordinatorSuite.scala          | 24 ++++---
 .../apache/spark/util/JsonProtocolSuite.scala   |  2 +-
 project/MimaExcludes.scala                      | 19 +++++-
 .../execution/datasources/WriterContainer.scala |  3 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  4 +-
 .../spark/sql/hive/hiveWriterContainers.scala   |  2 +-
 17 files changed, 157 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36c..c4b3215 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -104,8 +104,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
   }
 
   def commit() {
-    SparkHadoopMapRedUtil.commitTask(
-      getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), 
jobID, splitID)
   }
 
   def commitJob() {

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00d..7a690df 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -191,9 +191,12 @@ case object TaskKilled extends TaskFailedReason {
  * Task requested the driver to commit, but was denied.
  */
 @DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) 
extends TaskFailedReason {
+case class TaskCommitDenied(
+    jobID: Int,
+    partitionID: Int,
+    attemptNumber: Int) extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task 
commit)" +
-    s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+    s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala 
b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef..7d84889 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
     msg: String,
     jobID: Int,
     splitID: Int,
-    attemptID: Int)
+    attemptNumber: Int)
   extends Exception(msg) {
 
-  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, 
attemptID)
+  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, 
attemptNumber)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f405b73..f7298e8 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
       committer: MapReduceOutputCommitter,
       mrTaskContext: MapReduceTaskAttemptContext,
       jobId: Int,
-      splitId: Int,
-      attemptId: Int): Unit = {
+      splitId: Int): Unit = {
 
     val mrTaskAttemptID = 
SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
 
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
attemptId)
+        val taskAttemptNumber = TaskContext.get().attemptNumber()
+        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new 
attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, attemptId)
+          throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the 
commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
       logInfo(s"No need to commit output of task because 
needsTaskCommit=false: $mrTaskAttemptID")
     }
   }
-
-  def commitTask(
-      committer: MapReduceOutputCommitter,
-      mrTaskContext: MapReduceTaskAttemptContext,
-      sparkTaskContext: TaskContext): Unit = {
-    commitTask(
-      committer,
-      mrTaskContext,
-      sparkTaskContext.stageId(),
-      sparkTaskContext.partitionId(),
-      sparkTaskContext.attemptNumber())
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index de69911..cb2494d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -970,8 +970,11 @@ class DAGScheduler(
     val stageId = task.stageId
     val taskType = Utils.getFormattedClassName(task)
 
-    outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
-      event.taskInfo.attempt, event.reason)
+    outputCommitCoordinator.taskCompleted(
+      stageId,
+      task.partitionId,
+      event.taskInfo.attemptNumber, // this is a task attempt number
+      event.reason)
 
     // The success case is dealt with separately below, since we need to 
compute accumulator
     // updates before posting.

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 5d92637..add0ded 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, 
RpcEnv, RpcEndpoint
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
 private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, 
taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, 
attemptNumber: Int)
 
 /**
  * Authority that decides whether tasks can commit output to HDFS. Uses a 
"first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
   var coordinatorRef: Option[RpcEndpointRef] = None
 
   private type StageId = Int
-  private type PartitionId = Long
-  private type TaskAttemptId = Long
+  private type PartitionId = Int
+  private type TaskAttemptNumber = Int
 
   /**
    * Map from active stages's id => partition id => task attempt with 
exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
    * Access to this map should be guarded by synchronizing on the 
OutputCommitCoordinator instance.
    */
   private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
-  private type CommittersByStageMap = mutable.Map[StageId, 
mutable.Map[PartitionId, TaskAttemptId]]
+  private type CommittersByStageMap =
+    mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
    *
    * @param stage the stage number
    * @param partition the partition number
-   * @param attempt a unique identifier for this task attempt
+   * @param attemptNumber how many times this task has been attempted
+   *                      (see [[TaskContext.attemptNumber()]])
    * @return true if this task is authorized to commit, false otherwise
    */
   def canCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = {
-    val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+      attemptNumber: TaskAttemptNumber): Boolean = {
+    val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
     coordinatorRef match {
       case Some(endpointRef) =>
         endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
 
   // Called by DAGScheduler
   private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
-    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, 
TaskAttemptId]()
+    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, 
TaskAttemptNumber]()
   }
 
   // Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private[scheduler] def taskCompleted(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId,
+      attemptNumber: TaskAttemptNumber,
       reason: TaskEndReason): Unit = synchronized {
     val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
       case Success =>
       // The task output has been committed successfully
       case denied: TaskCommitDenied =>
-        logInfo(
-          s"Task was denied committing, stage: $stage, partition: $partition, 
attempt: $attempt")
+        logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
+          s"attempt: $attemptNumber")
       case otherReason =>
-        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
-          logDebug(s"Authorized committer $attempt (stage=$stage, 
partition=$partition) failed;" +
-            s" clearing lock")
+        if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, 
stage=$stage, " +
+            s"partition=$partition) failed; clearing lock")
           authorizedCommitters.remove(partition)
         }
     }
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private[scheduler] def handleAskPermissionToCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = synchronized {
+      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
     authorizedCommittersByStage.get(stage) match {
       case Some(authorizedCommitters) =>
         authorizedCommitters.get(partition) match {
           case Some(existingCommitter) =>
-            logDebug(s"Denying $attempt to commit for stage=$stage, 
partition=$partition; " +
-              s"existingCommitter = $existingCommitter")
+            logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+              s"partition=$partition; existingCommitter = $existingCommitter")
             false
           case None =>
-            logDebug(s"Authorizing $attempt to commit for stage=$stage, 
partition=$partition")
-            authorizedCommitters(partition) = attempt
+            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+              s"partition=$partition")
+            authorizedCommitters(partition) = attemptNumber
             true
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing task attempt 
$attempt to commit")
+        logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +
+          s"partition $partition to commit")
         false
     }
   }
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-      case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+      case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
         context.reply(
-          outputCommitCoordinator.handleAskPermissionToCommit(stage, 
partition, taskAttempt))
+          outputCommitCoordinator.handleAskPermissionToCommit(stage, 
partition, attemptNumber))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ce..f113c2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
 class TaskInfo(
     val taskId: Long,
     val index: Int,
-    val attempt: Int,
+    val attemptNumber: Int,
     val launchTime: Long,
     val executorId: String,
     val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
     }
   }
 
-  def id: String = s"$index.$attempt"
+  @deprecated("Use attemptNumber", "1.6.0")
+  def attempt: Int = attemptNumber
+
+  def id: String = s"$index.$attemptNumber"
 
   def duration: Long = {
     if (!finished) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136..24a0b52 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
     new TaskData(
       taskId = uiData.taskInfo.taskId,
       index = uiData.taskInfo.index,
-      attempt = uiData.taskInfo.attempt,
+      attempt = uiData.taskInfo.attemptNumber,
       launchTime = new Date(uiData.taskInfo.launchTime),
       executorId = uiData.taskInfo.executorId,
       host = uiData.taskInfo.host,

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2b71f55..712782d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
           serializationTimeProportionPos + serializationTimeProportion
 
         val index = taskInfo.index
-        val attempt = taskInfo.attempt
+        val attempt = taskInfo.attemptNumber
 
         val svgTag =
           if (totalExecutionTime == 0) {
@@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
     new TaskTableRowData(
       info.index,
       info.taskId,
-      info.attempt,
+      info.attemptNumber,
       info.speculative,
       info.status,
       info.taskLocality.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd..f742c39 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
     ("Task ID" -> taskInfo.taskId) ~
     ("Index" -> taskInfo.index) ~
-    ("Attempt" -> taskInfo.attempt) ~
+    ("Attempt" -> taskInfo.attemptNumber) ~
     ("Launch Time" -> taskInfo.launchTime) ~
     ("Executor ID" -> taskInfo.executorId) ~
     ("Host" -> taskInfo.host) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000..1ae5b03
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, 
SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with Timeouts {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .set("master", "local[2,4]")
+      .set("spark.speculation", "true")
+      .set("spark.hadoop.mapred.output.committer.class",
+        classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+    sc = new SparkContext("local[2, 4]", "test", conf)
+  }
+
+  test("exception thrown in OutputCommitter.commitTask()") {
+    // Regression test for SPARK-10381
+    failAfter(Span(60, Seconds)) {
+      val tempDir = Utils.createTempDir()
+      try {
+        sc.parallelize(1 to 4, 
2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
+      } finally {
+        Utils.deleteRecursively(tempDir)
+      }
+    }
+  }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends 
FileOutputCommitter {
+  override def commitTask(context: TaskAttemptContext): Unit = {
+    val ctx = TaskContext.get()
+    if (ctx.attemptNumber < 1) {
+      throw new java.io.FileNotFoundException("Intentional exception")
+    }
+    super.commitTask(context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e5ecd4b..6d08d7c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
  * was not in SparkHadoopWriter, the tests would still pass because only one 
of the
  * increments would be captured even though the commit in both tasks was 
executed
  * erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests 
that do
+ * not use mocks.
  */
 class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   test("Only authorized committer failures can clear the authorized committer 
lock (SPARK-6614)") {
     val stage: Int = 1
-    val partition: Long = 2
-    val authorizedCommitter: Long = 3
-    val nonAuthorizedCommitter: Long = 100
+    val partition: Int = 2
+    val authorizedCommitter: Int = 3
+    val nonAuthorizedCommitter: Int = 100
     outputCommitCoordinator.stageStart(stage)
-    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = 
authorizedCommitter))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter))
+
+    assert(outputCommitCoordinator.canCommit(stage, partition, 
authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter))
     // The non-authorized committer fails
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = nonAuthorizedCommitter, reason = 
TaskKilled)
     // New tasks should still not be able to commit because the authorized 
committer has not failed
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 1))
+      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 1))
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = authorizedCommitter, reason = 
TaskKilled)
     // A new task should now be allowed to become the authorized committer
     assert(
-      outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 2))
+      outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 2))
     // There can only be one authorized committer
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 3))
+      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 3))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a413..4bf6660 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -499,7 +499,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
     assert(info1.taskId === info2.taskId)
     assert(info1.index === info2.index)
-    assert(info1.attempt === info2.attempt)
+    assert(info1.attemptNumber === info2.attemptNumber)
     assert(info1.launchTime === info2.launchTime)
     assert(info1.executorId === info2.executorId)
     assert(info1.host === info2.host)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 88745dc..28673dd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -192,6 +192,23 @@ object MimaExcludes {
             // SPARK-9704 Made ProbabilisticClassifier, Identifiable, 
VectorUDT public APIs
             ProblemFilters.exclude[IncompatibleResultTypeProblem](
               "org.apache.spark.mllib.linalg.VectorUDT.serialize")
+          ) ++ Seq(
+            // SPARK-10381 Fix types / units in private 
AskPermissionToCommitOutput RPC message.
+            // This class is marked as `private` but MiMa still seems to be 
confused by the change.
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+               
"org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              
"org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              
"org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
           )
 
         case v if v.startsWith("1.4") =>
@@ -662,4 +679,4 @@ object MimaExcludes {
           
MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
         case _ => Seq()
       }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index d454144..c1599f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -194,8 +194,7 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, 
taskAttemptId.getId)
+    SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, 
jobId.getId, taskId.getId)
   }
 
   def abortTask(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 80d1e88..b3dd26e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -54,9 +54,9 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     details = ""
   )
 
-  private def createTaskInfo(taskId: Int, attempt: Int): TaskInfo = new 
TaskInfo(
+  private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new 
TaskInfo(
     taskId = taskId,
-    attempt = attempt,
+    attemptNumber = attemptNumber,
     // The following fields are not used in tests
     index = 0,
     launchTime = 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/2bbcbc65/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8dc796b..f1a1ed7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -122,7 +122,7 @@ private[hive] class SparkHiveWriterContainer(
   }
 
   protected def commit() {
-    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, 
attemptID)
+    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
   }
 
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to