Repository: spark
Updated Branches:
  refs/heads/master 190ff274f -> c2f0cb4f6


[SPARK-15714][CORE] Fix flaky o.a.s.scheduler.BlacklistIntegrationSuite

## What changes were proposed in this pull request?

BlacklistIntegrationSuite (introduced by SPARK-10372) is a bit flaky because of 
some race conditions:
1. Failed jobs might have non-empty results, because the resultHandler will be 
invoked for successful tasks (if there are task successes before failures)
2. taskScheduler.taskIdToTaskSetManager must be protected by a lock on 
taskScheduler

(1) has failed a handful of jenkins builds recently.  I don't think I've seen 
(2) in jenkins, but I've run into with some uncommitted tests I'm working on 
where there are lots more tasks.

While I was in there, I also made an unrelated fix to `runningTasks`in the test 
framework -- there was a pointless `O(n)` operation to remove completed tasks, 
could be `O(1)`.

## How was this patch tested?

I modified the o.a.s.scheduler.BlacklistIntegrationSuite to have it run the 
tests 1k times on my laptop.  It failed 11 times before this change, and none 
with it.  (Pretty sure all the failures were problem (1), though I didn't check 
all of them).

Also the full suite of tests via jenkins.

Author: Imran Rashid <iras...@cloudera.com>

Closes #13454 from squito/SPARK-15714.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2f0cb4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2f0cb4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2f0cb4f

Branch: refs/heads/master
Commit: c2f0cb4f6380c500f9ba37b2429503b762204973
Parents: 190ff27
Author: Imran Rashid <iras...@cloudera.com>
Authored: Fri Jun 3 11:49:33 2016 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Fri Jun 3 11:49:33 2016 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  1 +
 .../scheduler/BlacklistIntegrationSuite.scala   | 10 ++-
 .../scheduler/SchedulerIntegrationSuite.scala   | 68 ++++++++++++++------
 3 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2f0cb4f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5cb1af9..c3adc28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -83,6 +83,7 @@ private[spark] class TaskSchedulerImpl(
   // on this class.
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
+  // Protected by `this`
   private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, 
TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c2f0cb4f/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 6c9d4fb..3a4b7af 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -30,12 +30,12 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
    * all tasks.
    */
   def badHostBackend(): Unit = {
-    val task = backend.beginTask()
-    val host = backend.executorIdToExecutor(task.executorId).host
+    val (taskDescription, _) = backend.beginTask()
+    val host = backend.executorIdToExecutor(taskDescription.executorId).host
     if (host == badHost) {
-      backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
+      backend.taskFailed(taskDescription, new RuntimeException("I'm a bad 
host!"))
     } else {
-      backend.taskSuccess(task, 42)
+      backend.taskSuccess(taskDescription, 42)
     }
   }
 
@@ -48,7 +48,6 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
       val duration = Duration(1, SECONDS)
       Await.ready(jobFuture, duration)
     }
-    assert(results.isEmpty)
     assertDataStructuresEmpty(noFailure = false)
   }
 
@@ -68,7 +67,6 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
       val duration = Duration(3, SECONDS)
       Await.ready(jobFuture, duration)
     }
-    assert(results.isEmpty)
     assertDataStructuresEmpty(noFailure = false)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c2f0cb4f/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 02aa5ca..92bd765 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -89,7 +89,26 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     }
   }
 
+  /**
+   * A map from partition -> results for all tasks of a job when you call this 
test framework's
+   * [[submit]] method.  Two important considerations:
+   *
+   * 1. If there is a job failure, results may or may not be empty.  If any 
tasks succeed before
+   * the job has failed, they will get included in `results`.  Instead, check 
for job failure by
+   * checking [[failure]].  (Also see [[assertDataStructuresEmpty()]])
+   *
+   * 2. This only gets cleared between tests.  So you'll need to do special 
handling if you submit
+   * more than one job in one test.
+   */
   val results = new HashMap[Int, Any]()
+
+  /**
+   * If a call to [[submit]] results in a job failure, this will hold the 
exception, else it will
+   * be null.
+   *
+   * As with [[results]], this only gets cleared between tests, so care must 
be taken if you are
+   * submitting more than one job in one test.
+   */
   var failure: Throwable = _
 
   /**
@@ -113,6 +132,11 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     }
   }
 
+  /**
+   * Helper to run a few common asserts after a job has completed, in 
particular some internal
+   * datastructures for bookkeeping.  This only does a very minimal check for 
whether the job
+   * failed or succeeded -- often you will want extra asserts on [[results]] 
or [[failure]].
+   */
   protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = {
     if (noFailure) {
       if (failure != null) {
@@ -133,6 +157,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
       // when the job succeeds
       assert(taskScheduler.runningTaskSets.isEmpty)
       assert(!backend.hasTasks)
+    } else {
+      assert(failure != null)
     }
     assert(scheduler.activeJobs.isEmpty)
   }
@@ -217,10 +243,10 @@ private[spark] abstract class MockBackend(
    * Test backends should call this to get a task that has been assigned to 
them by the scheduler.
    * Each task should be responded to with either [[taskSuccess]] or 
[[taskFailed]].
    */
-  def beginTask(): TaskDescription = {
+  def beginTask(): (TaskDescription, Task[_]) = {
     synchronized {
       val toRun = 
assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
-      runningTasks += toRun
+      runningTasks += toRun._1.taskId
       toRun
     }
   }
@@ -255,7 +281,7 @@ private[spark] abstract class MockBackend(
     taskScheduler.statusUpdate(task.taskId, state, resultBytes)
     if (TaskState.isFinished(state)) {
       synchronized {
-        runningTasks -= task
+        runningTasks -= task.taskId
         executorIdToExecutor(task.executorId).freeCores += 
taskScheduler.CPUS_PER_TASK
         freeCores += taskScheduler.CPUS_PER_TASK
       }
@@ -264,9 +290,9 @@ private[spark] abstract class MockBackend(
   }
 
   // protected by this
-  private val assignedTasksWaitingToRun = new 
ArrayBuffer[TaskDescription](10000)
+  private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, 
Task[_])](10000)
   // protected by this
-  private val runningTasks = ArrayBuffer[TaskDescription]()
+  private val runningTasks = HashSet[Long]()
 
   def hasTasks: Boolean = synchronized {
     assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty
@@ -307,10 +333,19 @@ private[spark] abstract class MockBackend(
    */
   override def reviveOffers(): Unit = {
     val offers: Seq[WorkerOffer] = generateOffers()
-    val newTasks = taskScheduler.resourceOffers(offers).flatten
+    val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+    // get the task now, since that requires a lock on TaskSchedulerImpl, to 
prevent individual
+    // tests from introducing a race if they need it
+    val newTasks = taskScheduler.synchronized {
+      newTaskDescriptions.map { taskDescription =>
+        val taskSet = 
taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+        val task = taskSet.tasks(taskDescription.index)
+        (taskDescription, task)
+      }
+    }
     synchronized {
-      newTasks.foreach { task =>
-        executorIdToExecutor(task.executorId).freeCores -= 
taskScheduler.CPUS_PER_TASK
+      newTasks.foreach { case (taskDescription, _) =>
+        executorIdToExecutor(taskDescription.executorId).freeCores -= 
taskScheduler.CPUS_PER_TASK
       }
       freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
       assignedTasksWaitingToRun ++= newTasks
@@ -437,8 +472,8 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
    */
   testScheduler("super simple job") {
     def runBackend(): Unit = {
-      val task = backend.beginTask()
-      backend.taskSuccess(task, 42)
+      val (taskDescripition, _) = backend.beginTask()
+      backend.taskSuccess(taskDescripition, 42)
     }
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -473,9 +508,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     val d = join(30, b, c)
 
     def runBackend(): Unit = {
-      val taskDescription = backend.beginTask()
-      val taskSet = 
taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
-      val task = taskSet.tasks(taskDescription.index)
+      val (taskDescription, task) = backend.beginTask()
 
       // make sure the required map output is available
       task.stageId match {
@@ -515,9 +548,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
     val stageToAttempts = new HashMap[Int, HashSet[Int]]()
 
     def runBackend(): Unit = {
-      val taskDescription = backend.beginTask()
-      val taskSet = 
taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
-      val task = taskSet.tasks(taskDescription.index)
+      val (taskDescription, task) = backend.beginTask()
       stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += 
task.stageAttemptId
 
       // make sure the required map output is available
@@ -549,8 +580,8 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
 
   testScheduler("job failure after 4 attempts") {
     def runBackend(): Unit = {
-      val task = backend.beginTask()
-      backend.taskFailed(task, new RuntimeException("test task failure"))
+      val (taskDescription, _) = backend.beginTask()
+      backend.taskFailed(taskDescription, new RuntimeException("test task 
failure"))
     }
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -558,7 +589,6 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
       Await.ready(jobFuture, duration)
       failure.getMessage.contains("test task failure")
     }
-    assert(results.isEmpty)
     assertDataStructuresEmpty(noFailure = false)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to