Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r227062956
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +    "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
    +    // triggered.
    +    eventually(timeout(500.milliseconds)) {
    +      assert(tsm.isZombie)
    +    }
    +  }
    +
    +  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
    +    // executor and try to acquire a new one.
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    assert(!tsm.isZombie)
    +
    +    // Offer a new executor which should be accepted
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host0", 1)
    +    )).flatten.size === 1)
    --- End diff --
    
    you need this to happen within 10 millis of the offer just before this, 
right?  If so, this is going to lead to a lot of flakiness in the tests, as 
occasionally there are long pauses from gc etc. unrelated to the test.  I think 
you need to have a manual clock in the `TaskScheduleImpl` for tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to