Repository: spark
Updated Branches:
  refs/heads/master 0e80ecae3 -> 96d58f285


[SPARK-21219][CORE] Task retry occurs on same executor due to race condition 
with blacklisting

## What changes were proposed in this pull request?

There's a race condition in the current TaskSetManager where a failed task is 
added for retry (addPendingTask), and can asynchronously be assigned to an 
executor *prior* to the blacklist state (updateBlacklistForFailedTask), the 
result is the task might re-execute on the same executor.  This is particularly 
problematic if the executor is shutting down since the retry task immediately 
becomes a lost task (ExecutorLostFailure).  Another side effect is that the 
actual failure reason gets obscured by the retry task which never actually 
executed.  There are sample logs showing the issue in the 
https://issues.apache.org/jira/browse/SPARK-21219

The fix is to change the ordering of the addPendingTask and 
updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask

## How was this patch tested?

Implemented a unit test that verifies the task is black listed before it is 
added to the pending task.  Ran the unit test without the fix and it fails.  
Ran the unit test with the fix and it passes.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Eric Vandenberg <ericvandenb...@fb.com>

Closes #18427 from ericvandenbergfb/blacklistFix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96d58f28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96d58f28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96d58f28

Branch: refs/heads/master
Commit: 96d58f285bc98d4c2484150eefe7447db4784a86
Parents: 0e80eca
Author: Eric Vandenberg <ericvandenb...@fb.com>
Authored: Mon Jul 10 14:40:20 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jul 10 14:40:20 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 21 +++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 44 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96d58f28/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 02d374d..3968fb7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
   private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
-  private def addPendingTask(index: Int) {
+  private[spark] def addPendingTask(index: Int) {
     for (loc <- tasks(index).preferredLocations) {
       loc match {
         case e: ExecutorCacheTaskLocation =>
@@ -832,15 +832,6 @@ private[spark] class TaskSetManager(
 
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, 
info)
 
-    if (successful(index)) {
-      logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but 
the task will not" +
-        s" be re-executed (either because the task failed with a shuffle data 
fetch failure," +
-        s" so the previous stage needs to be re-run, or because a different 
copy of the task" +
-        s" has already succeeded).")
-    } else {
-      addPendingTask(index)
-    }
-
     if (!isZombie && reason.countTowardsTaskFailures) {
       taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
         info.host, info.executorId, index))
@@ -854,6 +845,16 @@ private[spark] class TaskSetManager(
         return
       }
     }
+
+    if (successful(index)) {
+      logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but 
the task will not" +
+        s" be re-executed (either because the task failed with a shuffle data 
fetch failure," +
+        s" so the previous stage needs to be re-run, or because a different 
copy of the task" +
+        s" has already succeeded).")
+    } else {
+      addPendingTask(index)
+    }
+
     maybeFinishTaskSet()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96d58f28/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 80fb674..e46900e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.mockito.Matchers.{any, anyInt, anyString}
-import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.Mockito.{mock, never, spy, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -1172,6 +1172,48 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(blacklistTracker.isNodeBlacklisted("host1"))
   }
 
+  test("update blacklist before adding pending task to avoid race condition") {
+    // When a task fails, it should apply the blacklist policy prior to
+    // retrying the task otherwise there's a race condition where run on
+    // the same executor that it was intended to be black listed from.
+    val conf = new SparkConf().
+      set(config.BLACKLIST_ENABLED, true)
+
+    // Create a task with two executors.
+    sc = new SparkContext("local", "test", conf)
+    val exec = "executor1"
+    val host = "host1"
+    val exec2 = "executor2"
+    val host2 = "host2"
+    sched = new FakeTaskScheduler(sc, (exec, host), (exec2, host2))
+    val taskSet = FakeTask.createTaskSet(1)
+
+    val clock = new ManualClock
+    val mockListenerBus = mock(classOf[LiveListenerBus])
+    val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, 
clock)
+    val taskSetManager = new TaskSetManager(sched, taskSet, 1, 
Some(blacklistTracker))
+    val taskSetManagerSpy = spy(taskSetManager)
+
+    val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, 
TaskLocality.ANY)
+
+    // Assert the task has been black listed on the executor it was last 
executed on.
+    when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
+      new Answer[Unit] {
+        override def answer(invocationOnMock: InvocationOnMock): Unit = {
+          val task = invocationOnMock.getArgumentAt(0, classOf[Int])
+          assert(taskSetManager.taskSetBlacklistHelperOpt.get.
+            isExecutorBlacklistedForTask(exec, task))
+        }
+      }
+    )
+
+    // Simulate a fake exception
+    val e = new ExceptionFailure("a", "b", Array(), "c", None)
+    taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, 
e)
+
+    verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to