Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19287#discussion_r141674264
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -744,6 +744,100 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         assert(resubmittedTasks === 0)
       }
     
    +
    +  test("[SPARK-22074] Task killed by other attempt task should not be 
resubmitted") {
    +    val conf = new SparkConf().set("spark.speculation", "true")
    +    sc = new SparkContext("local", "test", conf)
    +    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
    +    sc.conf.set("spark.speculation.multiplier", "0.0")
    +    sc.conf.set("spark.speculation.quantile", "0.5")
    +    sc.conf.set("spark.speculation", "true")
    +
    +    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
    +      ("exec2", "host2"), ("exec3", "host3"))
    +    sched.initialize(new FakeSchedulerBackend() {
    +      override def killTask(
    +       taskId: Long,
    +       executorId: String,
    +       interruptThread: Boolean,
    +       reason: String): Unit = {}
    +    })
    +
    +    // Keep track of the number of tasks that are resubmitted,
    +    // so that the test can check that no tasks were resubmitted.
    +    var resubmittedTasks = 0
    +    val dagScheduler = new FakeDAGScheduler(sc, sched) {
    +      override def taskEnded(
    +          task: Task[_],
    +          reason: TaskEndReason,
    +          result: Any,
    +          accumUpdates: Seq[AccumulatorV2[_, _]],
    +          taskInfo: TaskInfo): Unit = {
    +        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
    +        reason match {
    +          case Resubmitted => resubmittedTasks += 1
    +          case _ =>
    +        }
    +      }
    +    }
    +    sched.setDAGScheduler(dagScheduler)
    +
    +    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host3", "exec3")),
    +      Seq(TaskLocation("host2", "exec2")))
    +
    +    val clock = new ManualClock()
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
    +      task.metrics.internalAccums
    +    }
    +    // Offer resources for 4 tasks to start
    +    for ((k, v) <- List(
    +      "exec1" -> "host1",
    +      "exec1" -> "host1",
    +      "exec3" -> "host3",
    +      "exec2" -> "host2")) {
    +      val taskOption = manager.resourceOffer(k, v, NO_PREF)
    +      assert(taskOption.isDefined)
    +      val task = taskOption.get
    +      assert(task.executorId === k)
    +    }
    +    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
    +    clock.advance(1)
    +    // Complete the 2 tasks and leave 2 task in running
    +    for (id <- Set(0, 1)) {
    +      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
    +      assert(sched.endedTasks(id) === Success)
    +    }
    +
    +    // checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
    +    // speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
    +    // > 0ms, so advance the clock by 1ms here.
    +    clock.advance(1)
    +    assert(manager.checkSpeculatableTasks(0))
    +    assert(sched.speculativeTasks.toSet === Set(2, 3))
    +
    +    // Offer resource to start the speculative attempt for the running 
task 2.0
    +    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
    +    assert(taskOption.isDefined)
    +    val task4 = taskOption.get
    +    assert(task4.index === 2)
    +    assert(task4.taskId === 4)
    +    assert(task4.executorId === "exec2")
    +    assert(task4.attemptNumber === 1)
    +    sched.backend = mock(classOf[SchedulerBackend])
    --- End diff --
    
    its really weird to switch `sched.backend` in the middle of the test.  I 
worry that in the future this will not work as expected and the test will not 
work as expected when something else changes in the scheduler.
    
    instead, can you just override `killTask` in your subclass of 
`FakeSchedulerBackend` to track the calls you care about? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to