Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19287#discussion_r141953125
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -744,6 +744,112 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         assert(resubmittedTasks === 0)
       }
     
    +
    +  test("[SPARK-22074] Task killed by other attempt task should not be 
resubmitted") {
    +    val conf = new SparkConf().set("spark.speculation", "true")
    +    sc = new SparkContext("local", "test", conf)
    +    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
    +    sc.conf.set("spark.speculation.multiplier", "0.0")
    +    sc.conf.set("spark.speculation.quantile", "0.5")
    +    sc.conf.set("spark.speculation", "true")
    +
    +    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
    +      ("exec2", "host2"), ("exec3", "host3"))
    +    sched.initialize(new FakeSchedulerBackend() {
    +      override def killTask(
    +       taskId: Long,
    +       executorId: String,
    +       interruptThread: Boolean,
    +       reason: String): Unit = {
    +        // Check the only one killTask event in this case, which triggered 
by
    +        // task 2.1 completed.
    +        assert(taskId === 2)
    +        assert(executorId === "exec3")
    +        assert(interruptThread)
    +        assert(reason === "another attempt succeeded")
    +      }
    +    })
    +
    +    // Keep track of the number of tasks that are resubmitted,
    +    // so that the test can check that no tasks were resubmitted.
    +    var resubmittedTasks = 0
    +    val dagScheduler = new FakeDAGScheduler(sc, sched) {
    +      override def taskEnded(
    +          task: Task[_],
    +          reason: TaskEndReason,
    +          result: Any,
    +          accumUpdates: Seq[AccumulatorV2[_, _]],
    +          taskInfo: TaskInfo): Unit = {
    +        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
    +        reason match {
    +          case Resubmitted => resubmittedTasks += 1
    +          case _ =>
    +        }
    +      }
    +    }
    +    sched.setDAGScheduler(dagScheduler)
    +
    +    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host3", "exec3")),
    +      Seq(TaskLocation("host2", "exec2")))
    +
    +    val clock = new ManualClock()
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
    +      task.metrics.internalAccums
    +    }
    +    // Offer resources for 4 tasks to start
    +    for ((exec, host) <- Seq(
    +      "exec1" -> "host1",
    +      "exec1" -> "host1",
    +      "exec3" -> "host3",
    +      "exec2" -> "host2")) {
    +      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
    +      assert(taskOption.isDefined)
    +      val task = taskOption.get
    +      assert(task.executorId === exec)
    +      // Add an extra assert to make sure task 2.0 is running on exec3
    +      if (task.index == 2) {
    +        assert(task.attemptNumber === 0)
    +        assert(task.executorId === "exec3")
    +      }
    +    }
    +    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
    +    clock.advance(1)
    +    // Complete the 2 tasks and leave 2 task in running
    +    for (id <- Set(0, 1)) {
    +      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
    +      assert(sched.endedTasks(id) === Success)
    +    }
    +
    +    // checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
    +    // speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
    +    // > 0ms, so advance the clock by 1ms here.
    +    clock.advance(1)
    +    assert(manager.checkSpeculatableTasks(0))
    +    assert(sched.speculativeTasks.toSet === Set(2, 3))
    +
    +    // Offer resource to start the speculative attempt for the running 
task 2.0
    +    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
    +    assert(taskOption.isDefined)
    +    val task4 = taskOption.get
    +    assert(task4.index === 2)
    +    assert(task4.taskId === 4)
    +    assert(task4.executorId === "exec2")
    +    assert(task4.attemptNumber === 1)
    +    // Complete the speculative attempt for the running task
    +    manager.handleSuccessfulTask(4, createTaskResult(2, 
accumUpdatesByTask(2)))
    +    // With this successful task end, the sched.backend will kill other 
running attempt,
    +    // verify the request of killTask(2, "exec3", true, "another attempt 
succeeded") in
    +    // FakeDAGScheduler subclass
    --- End diff --
    
    this is *almost* as good as your previous code involving the mock, but 
there is one scenario its different -- you no longer make sure that `killTask` 
actually gets called at all.  I think you should have it set a flag which you 
check here.
    
    That additional assert also makes the comment unnecessary (or if you want 
to keep it, can be more succinct, eg. "Make sure schedBackend.killTask(2, 
"exec3", true, "another attempt succeeded") gets called"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to