Repository: spark Updated Branches: refs/heads/master 0cf600280 -> 6cf335d79
Added a TaskSetManager unit test. This test ensures that when there are no alive executors that satisfy a particular locality level, the TaskSetManager doesn't ever use that as the maximum allowed locality level (this optimization ensures that a job doesn't wait extra time in an attempt to satisfy a scheduling locality level that is impossible). @mateiz and @lirui-intel this unit test illustrates an issue with #892 (it fails with that patch). Author: Kay Ousterhout <kayousterh...@gmail.com> Closes #1024 from kayousterhout/scheduler_unit_test and squashes the following commits: de6a08f [Kay Ousterhout] Added a TaskSetManager unit test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cf335d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cf335d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cf335d7 Branch: refs/heads/master Commit: 6cf335d79a2f69ecd9a139dd0a03acff60585be4 Parents: 0cf6002 Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Mon Jun 9 13:13:53 2014 -0700 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Mon Jun 9 13:13:53 2014 -0700 ---------------------------------------------------------------------- .../spark/scheduler/TaskSetManagerSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6cf335d7/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c92b6dc..6f1fd25 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.finishedManagers.contains(manager)) } + test("skip unsatisfiable locality levels") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) + val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + // An executor that is not NODE_LOCAL should be rejected. + assert(manager.resourceOffer("execC", "host2", ANY) === None) + + // Because there are no alive PROCESS_LOCAL executors, the base locality level should be + // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before + // any of the locality wait timers expire. + assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) + } + test("basic delay scheduling") { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))