Repository: spark
Updated Branches:
  refs/heads/master 0e00f12d3 -> 37326079d


[SPARK-6614] OutputCommitCoordinator should clear authorized committer only 
after authorized committer fails, not after any failure

In OutputCommitCoordinator, there is some logic to clear the authorized 
committer's lock on committing in case that task fails.  However, it looks like 
the current code also clears this lock if other non-authorized tasks fail, 
which is an obvious bug.

In theory, it's possible that this could allow a new committer to start, run to 
completion, and commit output before the authorized committer finished, but 
it's unlikely that this race occurs often in practice due to the complex 
combination of failure and timing conditions that would be required to expose 
it.

This patch addresses this issue and adds a regression test.

Thanks to aarondav for spotting this issue.

Author: Josh Rosen <joshro...@databricks.com>

Closes #5276 from JoshRosen/SPARK-6614 and squashes the following commits:

d532ba7 [Josh Rosen] Check whether failed task was authorized committer
cbb3784 [Josh Rosen] Add regression test for SPARK-6614


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37326079
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37326079
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37326079

Branch: refs/heads/master
Commit: 37326079d818fdb140415a65653767d997613dac
Parents: 0e00f12
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Mar 31 16:18:39 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Mar 31 16:18:39 2015 -0700

----------------------------------------------------------------------
 .../scheduler/OutputCommitCoordinator.scala     |  8 ++++---
 .../OutputCommitCoordinatorSuite.scala          | 25 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37326079/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 17055e2..9e29fd1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -113,9 +113,11 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf) extends Logging {
         logInfo(
           s"Task was denied committing, stage: $stage, partition: $partition, 
attempt: $attempt")
       case otherReason =>
-        logDebug(s"Authorized committer $attempt (stage=$stage, 
partition=$partition) failed;" +
-          s" clearing lock")
-        authorizedCommitters.remove(partition)
+        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
+          logDebug(s"Authorized committer $attempt (stage=$stage, 
partition=$partition) failed;" +
+            s" clearing lock")
+          authorizedCommitters.remove(partition)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37326079/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c8c9578..cf97707 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -161,6 +161,31 @@ class OutputCommitCoordinatorSuite extends FunSuite with 
BeforeAndAfter {
     }
     assert(tempDir.list().size === 0)
   }
+
+  test("Only authorized committer failures can clear the authorized committer 
lock (SPARK-6614)") {
+    val stage: Int = 1
+    val partition: Long = 2
+    val authorizedCommitter: Long = 3
+    val nonAuthorizedCommitter: Long = 100
+    outputCommitCoordinator.stageStart(stage)
+    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = 
authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter))
+    // The non-authorized committer fails
+    outputCommitCoordinator.taskCompleted(
+      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+    // New tasks should still not be able to commit because the authorized 
committer has not failed
+    assert(
+      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 1))
+    // The authorized committer now fails, clearing the lock
+    outputCommitCoordinator.taskCompleted(
+      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+    // A new task should now be allowed to become the authorized committer
+    assert(
+      outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 2))
+    // There can only be one authorized committer
+    assert(
+      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 3))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to