[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751956#comment-15751956 ]
Imran Rashid commented on SPARK-18886: -------------------------------------- Here's a failing test case: (you can check it out directly here: https://github.com/squito/spark/tree/delay_sched-SPARK-18886) {code} test("Delay scheduling checks utilization at each locality level") { // Create a cluster with 100 executors, and submit 100 tasks, but each task would prefer to // be on the same node in the cluster. We should not wait to schedule each task on the one // executor. val conf = new SparkConf().set("spark.locality.wait", "1s") sc = new SparkContext("local", "test", conf) val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x", s"host$x") } val sched = new FakeTaskScheduler(sc, execs: _*) val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ => Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*) val clock = new ManualClock val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock) logInfo("initial locality levels = " + manager.myLocalityLevels.mkString(",")) assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) // initially, the locality preferences should lead us to only schedule tasks on one executor logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}") val firstScheduledTask = execs.flatMap { case (exec, host) => val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY) assert(schedTaskOpt.isDefined === (exec == "exec0")) schedTaskOpt }.head // without advancing the clock, no matter how many times we make offers on the *other* // executors, nothing should get scheduled (0 until 50).foreach { _ => execs.foreach { case (exec, host) => if (exec != "exec0") { assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty) } } } // now we advance the clock till just *before* the locality delay is up, and we finish the first // task val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process", "3s") val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s") clock.advance(processWait + nodeWait - 1) logInfo(s"finishing first task at ${clock.getTimeMillis()}") manager.handleSuccessfulTask(firstScheduledTask.taskId, createTaskResult(firstScheduledTask.index)) // if we offer all the resources again, still we should only schedule on one executor logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}") val secondScheduledTask = execs.flatMap { case (exec, host) => val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY) assert(schedTaskOpt.isDefined === (exec == "exec0")) schedTaskOpt }.head // Now lets advance the clock further, so that all of our other executors have been sitting // idle for longer than the locality wait time. We have managed to schedule *something* at a // lower locality level within the time, but regardless, we *should* still schedule on the all // the other resources by this point clock.advance(2000) // this would pass if we advanced the clock by this much instead // clock.advance(processWait + nodeWait + 10) logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}") execs.foreach { case (exec, host) => if (exec != "exec0") { withClue(s"trying to schedule on $exec:$host at time ${clock.getTimeMillis()}") { assert(manager.resourceOffer(execId = exec, host = host, ANY).isDefined) } } } } {code} > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 2.1.0 > Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org