Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200990424 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { + val conf = new SparkConf().set("spark.speculation", "true") + sc = new SparkContext("local", "test", conf) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation.quantile", "0.5") + sc.conf.set("spark.speculation", "true") + + var killTaskCalled = false + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) + sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { + // Check the only one killTask event in this case, which triggered by + // task 2.1 completed. + assert(taskId === 2) + assert(executorId === "exec3") + assert(interruptThread) + assert(reason === "another attempt succeeded") + killTaskCalled = true + } + }) + + // Keep track of the index of tasks that are resubmitted, + // so that the test can check that task is resubmitted correctly + var resubmittedTasks = new mutable.HashSet[Int] + val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], + reason: TaskEndReason, --- End diff -- ditto
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org