Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20998#discussion_r180614873
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -880,6 +880,59 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
       }
     
    +  test("speculative task should not run on a given host where another 
attempt " +
    +    "is already running on") {
    +    sc = new SparkContext("local", "test")
    +    sched = new FakeTaskScheduler(
    +      sc, ("execA", "host1"), ("execB", "host2"))
    +    val taskSet = FakeTask.createTaskSet(1,
    +      Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB")))
    +    val clock = new ManualClock
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +
    +    // let task0.0 run on host1
    +    assert(manager.resourceOffer("execA", "host1", 
PROCESS_LOCAL).get.index == 0)
    +    val info1 = manager.taskAttempts(0)(0)
    +    assert(info1.running === true)
    +    assert(info1.host === "host1")
    +
    +    // long time elapse, and task0.0 is still running,
    +    // so we launch a speculative task0.1 on host2
    +    clock.advance(1000)
    +    manager.speculatableTasks += 0
    +    assert(manager.resourceOffer("execB", "host2", 
PROCESS_LOCAL).get.index === 0)
    +    val info2 = manager.taskAttempts(0)(0)
    +    assert(info2.running === true)
    +    assert(info2.host === "host2")
    +    assert(manager.speculatableTasks.size === 0)
    +
    +    // now, task0 has two copies running on host1, host2 separately,
    +    // so we can not launch a speculative task on any hosts.
    +    manager.speculatableTasks += 0
    +    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None)
    +    assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
    +    assert(manager.speculatableTasks.size === 1)
    +
    +    // after a long long time, task0.0 failed, and task0.0 can not re-run 
since
    +    // there's already a running copy.
    +    clock.advance(1000)
    +    info1.finishTime = clock.getTimeMillis()
    --- End diff --
    
    nice suggestion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to