squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] : 
Optimize Spark Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677#discussion_r304138587
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -1723,4 +1722,48 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
     assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
   }
+
+  test("SPARK-26755 Ensure that a speculative task obeys the original locality 
preferences") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4"))
+    // Create 3 tasks with locality preferences
+    val taskSet = FakeTask.createTaskSet(3,
+      Seq(TaskLocation("host1"), TaskLocation("host3")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3")))
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 3 tasks to start
+    Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { 
case (exec, host) =>
+      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === exec)
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2))
+    clock.advance(1)
+    // Finish one task and mark the others as speculatable
+    manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
+    assert(sched.endedTasks(2) === Success)
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(0, 1))
+    // Ensure that the speculatable tasks obey the original locality 
preferences
+    assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty)
+    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty)
+    assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined)
+    assert(manager.resourceOffer("exec4", "host4", ANY).isDefined)
 
 Review comment:
   any particular reason to pull this out into a separate test case?  Seems 
like it could be combined.  Its fine if there is a good reason, but I don't 
like a proliferation of test cases that are all doing more or less the same 
thing.  It seems the only thing which you aren't doing here, but you are doing 
above, is checking the taskId etc. of the speculative tasks.
   
   also another thing missing from both tests -- there is no check that we do 
not schedule a speculative task on the same host as the original task, even 
despite locality preferences.
   
   (I realize some of these tests were missing before, but this logic is 
getting a little trickier now, and maybe those tests always should have been 
there)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to