Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r230682694 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -503,6 +507,182 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(tsm).abort(anyString(), anyObject()) } + test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + + "executor can be acquired") { + // set the abort timer to fail immediately + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to kick in immediately + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // Wait for the abort timer to kick in. Even though we configure the timeout to be 0, there is a + // slight delay as the abort timer is launched in a separate thread. + eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) + } + } + + test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.head + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to expire if no new executors could be acquired. We kill the existing idle blacklisted + // executor and try to acquire a new one. + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm)) + assert(!tsm.isZombie) + + // Offer a new executor which should be accepted + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) + )).flatten.size === 1) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) + assert(!tsm.isZombie) + } + + // This is to test a scenario where we have two taskSets completely blacklisted and on acquiring + // a new executor we don't want the abort timer for the second taskSet to expire and abort the job + test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime for all TaskSets") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + + // We have 2 taskSets with 1 task remaining in each with 1 executor completely blacklisted + val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet1) + val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet2) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) + + // Fail the running task + val failedTask = firstTaskAttempts.head + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer. We will schedule the task from the second taskSet. Since a task was scheduled + // we do not kick off the abort timer for taskSet1 + val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) + + val tsm2 = stageToMockTaskSetManager(1) + val failedTask2 = secondTaskAttempts.head + taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason) + when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask2.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer for taskSet1 and taskSet2 + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm)) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm2)) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2) + + // Offer a new executor which should be accepted + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host1", 1) + )).flatten.size === 1) + + // Check if all the taskSets are cleared + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) + + assert(!tsm.isZombie) + } + + // this test is to check that we don't abort a taskSet which is not being scheduled on other + // executors as it is waiting on locality timeout and not being aborted because it is still not + // completely blacklisted. + test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been completely blacklisted") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + + val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", "executor0")) + val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0, + preferredLocation) + taskScheduler.submitTasks(taskSet1) + + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + var taskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = taskAttempts.head + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer but we won't schedule anything yet as scheduler locality is still PROCESS_LOCAL + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) + )).flatten.isEmpty) --- End diff -- this is dependent on the system clock not advancing past the locality timeout. I've seen pauses on jenkins over 5 seconds in flaky tests -- either put in a manual clock or just increase the locality timeout in this test to avoid flakiness here
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org