Repository: spark Updated Branches: refs/heads/master 5a826c00c -> 63bdb1f41
SPARK-2294: fix locality inversion bug in TaskManager copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294): If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled. This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks. ---- I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference Author: CodingCat <zhunans...@gmail.com> Closes #1313 from CodingCat/SPARK-2294 and squashes the following commits: bf3f13b [CodingCat] rollback some forgotten changes 89f9bc0 [CodingCat] address matei's comments 18cae02 [CodingCat] add test case for node-local tasks 2ba6195 [CodingCat] fix failed test cases 87dd09e [CodingCat] fix style 9b9432f [CodingCat] remove hasNodeLocalOnlyTasks fdd1573 [CodingCat] fix failed test cases 941a4fd [CodingCat] see my shocked face.......... f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking 0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results 73ceda8 [CodingCat] style fix b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl c8c1de4 [CodingCat] simplify the patch be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref branch Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63bdb1f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63bdb1f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63bdb1f4 Branch: refs/heads/master Commit: 63bdb1f41b4895e3a9444f7938094438a94d3007 Parents: 5a826c0 Author: CodingCat <zhunans...@gmail.com> Authored: Tue Aug 5 23:02:58 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Tue Aug 5 23:02:58 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/TaskLocality.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 7 +- .../apache/spark/scheduler/TaskSetManager.scala | 109 +++++----- .../spark/scheduler/TaskSetManagerSuite.scala | 205 +++++++++++++------ 4 files changed, 203 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index eb920ab..f176d09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi object TaskLocality extends Enumeration { // Process local is expected to be used ONLY within TaskSetManager for now. - val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value + val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value type TaskLocality = Value http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d2f764f..6c0d1b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl( // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host - private val executorsByHost = new HashMap[String, HashSet[String]] + protected val executorsByHost = new HashMap[String, HashSet[String]] protected val hostsByRack = new HashMap[String, HashSet[String]] - private val executorIdToHost = new HashMap[String, String] + protected val executorIdToHost = new HashMap[String, String] // Listener object to pass upcalls into var dagScheduler: DAGScheduler = null @@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl( // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. + // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { @@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl( activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK - assert (availableCpus(i) >= 0) + assert(availableCpus(i) >= 0) launchedTask = true } } http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b5e8cb..20a4bd1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -79,6 +79,7 @@ private[spark] class TaskSetManager( private val numFailures = new Array[Int](numTasks) // key is taskId, value is a Map of executor id to when it failed private val failedExecutors = new HashMap[Int, HashMap[String, Long]]() + val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksSuccessful = 0 @@ -179,26 +180,17 @@ private[spark] class TaskSetManager( } } - var hadAliveLocations = false for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) } - if (sched.hasExecutorsAliveOnHost(loc.host)) { - hadAliveLocations = true - } addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) - if(sched.hasHostAliveOnRack(rack)){ - hadAliveLocations = true - } } } - if (!hadAliveLocations) { - // Even though the task might've had preferred locations, all of those hosts or executors - // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. + if (tasks(index).preferredLocations == Nil) { addTo(pendingTasksWithNoPrefs) } @@ -239,7 +231,6 @@ private[spark] class TaskSetManager( */ private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = { var indexOffset = list.size - while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) @@ -288,12 +279,12 @@ private[spark] class TaskSetManager( !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index) if (!speculatableTasks.isEmpty) { - // Check for process-local or preference-less tasks; note that tasks can be process-local + // Check for process-local tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming for (index <- speculatableTasks if canRunOnHost(index)) { val prefs = tasks(index).preferredLocations val executors = prefs.flatMap(_.executorId) - if (prefs.size == 0 || executors.contains(execId)) { + if (executors.contains(execId)) { speculatableTasks -= index return Some((index, TaskLocality.PROCESS_LOCAL)) } @@ -310,6 +301,17 @@ private[spark] class TaskSetManager( } } + // Check for no-preference tasks + if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { + for (index <- speculatableTasks if canRunOnHost(index)) { + val locations = tasks(index).preferredLocations + if (locations.size == 0) { + speculatableTasks -= index + return Some((index, TaskLocality.PROCESS_LOCAL)) + } + } + } + // Check for rack-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for (rack <- sched.getRackForHost(host)) { @@ -341,20 +343,27 @@ private[spark] class TaskSetManager( * * @return An option containing (task index within the task set, locality, is speculative?) */ - private def findTask(execId: String, host: String, locality: TaskLocality.Value) + private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } - if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { + if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL, false)) } } - if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { + if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { + // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic + for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { + return Some((index, TaskLocality.PROCESS_LOCAL, false)) + } + } + + if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- findTaskFromList(execId, getPendingTasksForRack(rack)) @@ -363,25 +372,27 @@ private[spark] class TaskSetManager( } } - // Look for no-pref tasks after rack-local tasks since they can run anywhere. - for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) - } - - if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { + if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { for (index <- findTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY, false)) } } - // Finally, if all else has failed, find a speculative task - findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) => - (taskIndex, allowedLocality, true) - } + // find a speculative task if all others tasks have been scheduled + findSpeculativeTask(execId, host, maxLocality).map { + case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} } /** * Respond to an offer of a single executor from the scheduler by finding a task + * + * NOTE: this function is either called with a maxLocality which + * would be adjusted by delay scheduling algorithm or it will be with a special + * NO_PREF locality which will be not modified + * + * @param execId the executor Id of the offered resource + * @param host the host Id of the offered resource + * @param maxLocality the maximum locality we want to schedule the tasks at */ def resourceOffer( execId: String, @@ -392,9 +403,14 @@ private[spark] class TaskSetManager( if (!isZombie) { val curTime = clock.getTime() - var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality > maxLocality) { - allowedLocality = maxLocality // We're not allowed to search for farther-away tasks + var allowedLocality = maxLocality + + if (maxLocality != TaskLocality.NO_PREF) { + allowedLocality = getAllowedLocalityLevel(curTime) + if (allowedLocality > maxLocality) { + // We're not allowed to search for farther-away tasks + allowedLocality = maxLocality + } } findTask(execId, host, allowedLocality) match { @@ -410,8 +426,11 @@ private[spark] class TaskSetManager( taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime + // NO_PREF will not affect the variables related to delay scheduling + if (maxLocality != TaskLocality.NO_PREF) { + currentLocalityIndex = getLocalityIndex(taskLocality) + lastLaunchTime = curTime + } // Serialize and return the task val startTime = clock.getTime() // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here @@ -639,8 +658,7 @@ private[spark] class TaskSetManager( override def executorLost(execId: String, host: String) { logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) - // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a - // task that used to have locations on only this host might now go to the no-prefs list. Note + // Re-enqueue pending tasks for this host based on the status of the cluster. Note // that it's okay if we add a task to the same queue twice (if it had multiple preferred // locations), because findTaskFromList will skip already-running tasks. for (index <- getPendingTasksForExecutor(execId)) { @@ -671,6 +689,9 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure) } + // recalculate valid locality levels and waits when executor is lost + myLocalityLevels = computeValidLocalityLevels() + localityWaits = myLocalityLevels.map(getLocalityWait) } /** @@ -722,17 +743,17 @@ private[spark] class TaskSetManager( conf.get("spark.locality.wait.node", defaultWait).toLong case TaskLocality.RACK_LOCAL => conf.get("spark.locality.wait.rack", defaultWait).toLong - case TaskLocality.ANY => - 0L + case _ => 0L } } /** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. + * */ private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { - import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY} + import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { @@ -742,6 +763,9 @@ private[spark] class TaskSetManager( pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } + if (!pendingTasksWithNoPrefs.isEmpty) { + levels += NO_PREF + } if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL @@ -751,20 +775,7 @@ private[spark] class TaskSetManager( levels.toArray } - // Re-compute pendingTasksWithNoPrefs since new preferred locations may become available def executorAdded() { - def newLocAvail(index: Int): Boolean = { - for (loc <- tasks(index).preferredLocations) { - if (sched.hasExecutorsAliveOnHost(loc.host) || - (sched.getRackForHost(loc.host).isDefined && - sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) { - return true - } - } - false - } - logInfo("Re-computing pending task lists.") - pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_)) myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) } http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c52368b..ffd2338 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -85,14 +85,31 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex val finishedManagers = new ArrayBuffer[TaskSetManager] val taskSetsFailed = new ArrayBuffer[String] - val executors = new mutable.HashMap[String, String] ++ liveExecutors + val executors = new mutable.HashMap[String, String] + for ((execId, host) <- liveExecutors) { + addExecutor(execId, host) + } + for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) { hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host } dagScheduler = new FakeDAGScheduler(sc, this) - def removeExecutor(execId: String): Unit = executors -= execId + def removeExecutor(execId: String) { + executors -= execId + val host = executorIdToHost.get(execId) + assert(host != None) + val hostId = host.get + val executorsOnHost = executorsByHost(hostId) + executorsOnHost -= execId + for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) { + hosts -= hostId + if (hosts.isEmpty) { + hostsByRack -= rack + } + } + } override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager @@ -100,8 +117,15 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) + override def hasHostAliveOnRack(rack: String): Boolean = { + hostsByRack.get(rack) != None + } + def addExecutor(execId: String, host: String) { executors.put(execId, host) + val executorsOnHost = executorsByHost.getOrElseUpdate(host, new mutable.HashSet[String]) + executorsOnHost += execId + executorIdToHost += execId -> host for (rack <- getRackForHost(host)) { hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host } @@ -123,7 +147,7 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { } class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { - import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} + import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL} private val conf = new SparkConf @@ -134,18 +158,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - // Offer a host with process-local as the constraint; this should work because the TaskSet - // above won't have any locality preferences - val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) + // Offer a host with NO_PREF as the constraint, + // we should get a nopref task immediately since that's what we only have + var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption.isDefined) - val task = taskOption.get - assert(task.executorId === "exec1") - assert(sched.startedTasks.contains(0)) - - // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) // Tell it the task has finished manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -161,7 +180,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // First three offers should all find tasks for (i <- 0 until 3) { - val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) + var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === "exec1") @@ -169,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.startedTasks.toSet === Set(0, 1, 2)) // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None) // Finish the first two tasks manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -211,37 +230,40 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { ) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - // First offer host1, exec1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - - // Offer host1, exec1 again: the last task, which has no prefs, should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3) - - // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) clock.advance(LOCALITY_WAIT) - - // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) - - // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2 + // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should + // get chosen before the noPref task assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) - // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None) - - // Offer host1, exec1 again, at ANY level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + // Offer host2, exec3 again, at NODE_LOCAL level: we should choose task 2 + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1) + // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task + // after failing to find a node_Local task + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) clock.advance(LOCALITY_WAIT) + assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) + } - // Offer host1, exec1 again, at ANY level: task 1 should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) - - // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + test("we do not need to delay scheduling when we only have noPref tasks in the queue") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2")) + val taskSet = FakeTask.createTaskSet(3, + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host2", "exec3")), + Seq() // Last task has no locality prefs + ) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + // First offer host1, exec1: first task should be chosen + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0) + assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1) + assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None) + assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2) } test("delay scheduling with fallback") { @@ -298,20 +320,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // First offer host1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - // Offer host1 again: third task should be chosen immediately because host3 is not up - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) - - // After this, nothing should get chosen + // After this, nothing should get chosen, because we have separated tasks with unavailable preference + // from the noPrefPendingTasks assert(manager.resourceOffer("exec1", "host1", ANY) === None) // Now mark host2 as dead sched.removeExecutor("exec2") manager.executorLost("exec2", "host2") - // Task 1 should immediately be launched on host1 because its original host is gone + // nothing should be chosen + assert(manager.resourceOffer("exec1", "host1", ANY) === None) + + clock.advance(LOCALITY_WAIT * 2) + + // task 1 and 2 would be scheduled as nonLocal task assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) - // Now that all tasks have launched, nothing new should be launched anywhere else + // all finished assert(manager.resourceOffer("exec1", "host1", ANY) === None) assert(manager.resourceOffer("exec2", "host2", ANY) === None) } @@ -373,7 +399,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val manager = new TaskSetManager(sched, taskSet, 4, clock) { - val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -384,15 +410,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1 fails after failure 1 due to blacklist - assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) } // Run the task on exec1.1 - should work, and then fail it on exec1.1 { - val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL) + val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) @@ -404,12 +430,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist - assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty) } // Run the task on exec2 - should work, and then fail it on exec2 { - val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY) + val offerResult = manager.resourceOffer("exec2", "host2", ANY) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -420,20 +446,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec2 fails after failure 3 due to blacklist - assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) } // After reschedule delay, scheduling on exec1 should be possible. clock.advance(rescheduleDelay) { - val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) assert(offerResult.get.executorId === "exec1") - assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) // Cause exec1 to fail : failure 4 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) @@ -443,7 +469,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.taskSetsFailed.contains(taskSet.id)) } - test("new executors get added") { + test("new executors get added and lost") { // Assign host2 to rack2 FakeRackUtil.cleanUp() FakeRackUtil.assignHostToRack("host2", "rack2") @@ -456,26 +482,25 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { Seq()) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - // All tasks added to no-pref list since no preferred location is available - assert(manager.pendingTasksWithNoPrefs.size === 4) // Only ANY is valid - assert(manager.myLocalityLevels.sameElements(Array(ANY))) + assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY))) // Add a new executor sched.addExecutor("execD", "host1") manager.executorAdded() - // Task 0 and 1 should be removed from no-pref list - assert(manager.pendingTasksWithNoPrefs.size === 2) // Valid locality should contain NODE_LOCAL and ANY - assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY))) + assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY))) // Add another executor sched.addExecutor("execC", "host2") manager.executorAdded() - // No-pref list now only contains task 3 - assert(manager.pendingTasksWithNoPrefs.size === 1) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY - assert(manager.myLocalityLevels.sameElements( - Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY))) - FakeRackUtil.cleanUp() + assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY))) + // test if the valid locality is recomputed when the executor is lost + sched.removeExecutor("execC") + manager.executorLost("execC", "host2") + assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY))) + sched.removeExecutor("execD") + manager.executorLost("execD", "host1") + assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY))) } test("test RACK_LOCAL tasks") { @@ -506,7 +531,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Offer host2 // Task 1 can be scheduled with RACK_LOCAL assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) - FakeRackUtil.cleanUp() } test("do not emit warning when serialized task is small") { @@ -536,6 +560,53 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.emittedTaskSizeWarning) } + test("speculative and noPref task should be scheduled after node-local") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) + val taskSet = FakeTask.createTaskSet(4, + Seq(TaskLocation("host1", "execA")), + Seq(TaskLocation("host2"), TaskLocation("host1")), + Seq(), + Seq(TaskLocation("host3", "execC"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) + assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) + + manager.speculatableTasks += 1 + clock.advance(LOCALITY_WAIT) + // schedule the nonPref task + assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) + // schedule the speculative task + assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) + clock.advance(LOCALITY_WAIT * 3) + // schedule non-local tasks + assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) + } + + test("node-local tasks should be scheduled right away when there are only node-local and no-preference tasks") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) + val taskSet = FakeTask.createTaskSet(4, + Seq(TaskLocation("host1")), + Seq(TaskLocation("host2")), + Seq(), + Seq(TaskLocation("host3"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + // node-local tasks are scheduled without delay + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) + assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None) + + // schedule no-preference after node local ones + assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org