squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] : 
Optimize Spark Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677#discussion_r304567188
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -1691,79 +1692,74 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // > 0ms, so advance the clock by 1ms here.
     clock.advance(1)
     assert(manager.checkSpeculatableTasks(0))
-    assert(sched.speculativeTasks.toSet === Set(2, 3))
-    assert(manager.copiesRunning(2) === 1)
+    assert(sched.speculativeTasks.toSet === Set(1, 3))
+    assert(manager.copiesRunning(1) === 1)
     assert(manager.copiesRunning(3) === 1)
 
     // Offer resource to start the speculative attempt for the running task. 
We offer more
     // resources, and ensure that speculative tasks get scheduled 
appropriately -- only one extra
     // copy per speculatable task
     val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)
-    val taskOption3 = manager.resourceOffer("exec1", "host1", NO_PREF)
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)
     assert(taskOption2.isDefined)
     val task2 = taskOption2.get
+    // Ensure that task index 3 is launched on host1 and task index 4 on host2
     assert(task2.index === 3)
     assert(task2.taskId === 4)
     assert(task2.executorId === "exec1")
     assert(task2.attemptNumber === 1)
     assert(taskOption3.isDefined)
     val task3 = taskOption3.get
-    assert(task3.index === 2)
+    assert(task3.index === 1)
     assert(task3.taskId === 5)
-    assert(task3.executorId === "exec1")
+    assert(task3.executorId === "exec2")
     assert(task3.attemptNumber === 1)
     clock.advance(1)
     // Running checkSpeculatableTasks again should return false
     assert(!manager.checkSpeculatableTasks(0))
-    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(1) === 2)
     assert(manager.copiesRunning(3) === 2)
     // Offering additional resources should not lead to any speculative tasks 
being respawned
     assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
     assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
     assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
-  }
 
-  test("SPARK-26755 Ensure that a speculative task obeys the original locality 
preferences") {
-    sc = new SparkContext("local", "test")
+    // Launch a new set of tasks with locality preferences
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
       ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4"))
-    // Create 3 tasks with locality preferences
-    val taskSet = FakeTask.createTaskSet(3,
+    val taskSet2 = FakeTask.createTaskSet(3,
       Seq(TaskLocation("host1"), TaskLocation("host3")),
       Seq(TaskLocation("host2")),
       Seq(TaskLocation("host3")))
-    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
-    sc.conf.set(config.SPECULATION_ENABLED, true)
-    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
-    val clock = new ManualClock()
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
-    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+    val clock2 = new ManualClock()
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, 
clock = clock2)
+    val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet2.tasks.map { task =>
       task.metrics.internalAccums
     }
+
     // Offer resources for 3 tasks to start
     Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { 
case (exec, host) =>
-      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      val taskOption = manager2.resourceOffer(exec, host, NO_PREF)
       assert(taskOption.isDefined)
       assert(taskOption.get.executorId === exec)
     }
     assert(sched.startedTasks.toSet === Set(0, 1, 2))
-    clock.advance(1)
+    clock2.advance(1)
     // Finish one task and mark the others as speculatable
-    manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
+    manager2.handleSuccessfulTask(2, createTaskResult(2, 
accumUpdatesByTask2(2)))
     assert(sched.endedTasks(2) === Success)
-    clock.advance(1)
-    assert(manager.checkSpeculatableTasks(0))
+    clock2.advance(1)
+    assert(manager2.checkSpeculatableTasks(0))
     assert(sched.speculativeTasks.toSet === Set(0, 1))
     // Ensure that the speculatable tasks obey the original locality 
preferences
-    assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
-    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
-    assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
-    assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
+    assert(manager2.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
+    assert(manager2.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
 
 Review comment:
   a comment here would be helpful -- eg. "task 1 does have a node-local 
preference for host2 -- but we've already got a regular task running there, so 
we should not schedule a speculative there as well."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to