Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227080534 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(tsm).abort(anyString(), anyObject()) } + test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + + "executor can be acquired") { + // set the abort timer to fail immediately + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to kick in immediately + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // Wait for the abort timer to kick in. Without sleep the test exits before the timer is + // triggered. + eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) + } + } + + test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to expire if no new executors could be acquired. We kill the existing idle blacklisted + // executor and try to acquire a new one. + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + assert(!tsm.isZombie) + + // Offer a new executor which should be accepted + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) + )).flatten.size === 1) --- End diff -- We specify the config in `seconds`. The expectation here is that timer should expire in `10 seconds`, which I think is sufficient to account for gc time. However, we could just remove this as the default is 120s.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org