[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751956#comment-15751956
 ] 

Imran Rashid edited comment on SPARK-18886 at 12/15/16 5:32 PM:
----------------------------------------------------------------

Here's a failing test case: (you can check it out directly here: 
https://github.com/squito/spark/tree/delay_sched-SPARK-18886)

{code}
  test("Delay scheduling checks utilization at each locality level") {
    // Create a cluster with 100 executors, and submit 100 tasks, but each task 
would prefer to
    // be on the same node in the cluster.  We should not wait to schedule each 
task on the one
    // executor.
    sc = new SparkContext("local", "test")
    val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x", 
s"host$x") }
    val sched = new FakeTaskScheduler(sc, execs: _*)
    val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ =>
      Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*)
    val clock = new ManualClock
    val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock)
    logInfo("initial locality levels = " + 
manager.myLocalityLevels.mkString(","))
    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
    // initially, the locality preferences should lead us to only schedule 
tasks on one executor
    logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}")
    val firstScheduledTask = execs.flatMap { case (exec, host) =>
      val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
      assert(schedTaskOpt.isDefined === (exec == "exec0"))
      schedTaskOpt
    }.head

    // without advancing the clock, no matter how many times we make offers on 
the *other*
    // executors, nothing should get scheduled
    (0 until 50).foreach { _ =>
      execs.foreach { case (exec, host) =>
        if (exec != "exec0") {
          assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty)
        }
      }
    }

    // now we advance the clock till just *before* the locality delay is up, 
and we finish the first
    // task
    val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process", 
"3s")
    val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s")
    clock.advance(processWait + nodeWait - 1)
    logInfo(s"finishing first task at ${clock.getTimeMillis()}")
    manager.handleSuccessfulTask(firstScheduledTask.taskId,
      createTaskResult(firstScheduledTask.index))
    // if we offer all the resources again, still we should only schedule on 
one executor
    logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}")
    val secondScheduledTask = execs.flatMap { case (exec, host) =>
      val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
      assert(schedTaskOpt.isDefined === (exec == "exec0"))
      schedTaskOpt
    }.head

    // Now lets advance the clock further, so that all of our other executors 
have been sitting
    // idle for longer than the locality wait time.  We have managed to 
schedule *something* at a
    // lower locality level within the time, but regardless, we *should* still 
schedule on the all
    // the other resources by this point
    clock.advance(10)
    // this would pass if we advanced the clock by this much instead
//    clock.advance(processWait + nodeWait + 10)
    logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}")
    execs.foreach { case (exec, host) =>
      if (exec != "exec0") {
        withClue(s"trying to schedule on $exec:$host at time 
${clock.getTimeMillis()}") {
          assert(manager.resourceOffer(execId = exec, host = host, 
ANY).isDefined)
        }
      }
    }
  }
{code}


was (Author: irashid):
Here's a failing test case: (you can check it out directly here: 
https://github.com/squito/spark/tree/delay_sched-SPARK-18886)

{code}
  test("Delay scheduling checks utilization at each locality level") {
    // Create a cluster with 100 executors, and submit 100 tasks, but each task 
would prefer to
    // be on the same node in the cluster.  We should not wait to schedule each 
task on the one
    // executor.
    val conf = new SparkConf().set("spark.locality.wait", "1s")
    sc = new SparkContext("local", "test", conf)
    val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x", 
s"host$x") }
    val sched = new FakeTaskScheduler(sc, execs: _*)
    val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ =>
      Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*)
    val clock = new ManualClock
    val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock)
    logInfo("initial locality levels = " + 
manager.myLocalityLevels.mkString(","))
    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
    // initially, the locality preferences should lead us to only schedule 
tasks on one executor
    logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}")
    val firstScheduledTask = execs.flatMap { case (exec, host) =>
      val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
      assert(schedTaskOpt.isDefined === (exec == "exec0"))
      schedTaskOpt
    }.head

    // without advancing the clock, no matter how many times we make offers on 
the *other*
    // executors, nothing should get scheduled
    (0 until 50).foreach { _ =>
      execs.foreach { case (exec, host) =>
        if (exec != "exec0") {
          assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty)
        }
      }
    }

    // now we advance the clock till just *before* the locality delay is up, 
and we finish the first
    // task
    val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process", 
"3s")
    val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s")
    clock.advance(processWait + nodeWait - 1)
    logInfo(s"finishing first task at ${clock.getTimeMillis()}")
    manager.handleSuccessfulTask(firstScheduledTask.taskId,
      createTaskResult(firstScheduledTask.index))
    // if we offer all the resources again, still we should only schedule on 
one executor
    logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}")
    val secondScheduledTask = execs.flatMap { case (exec, host) =>
      val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
      assert(schedTaskOpt.isDefined === (exec == "exec0"))
      schedTaskOpt
    }.head

    // Now lets advance the clock further, so that all of our other executors 
have been sitting
    // idle for longer than the locality wait time.  We have managed to 
schedule *something* at a
    // lower locality level within the time, but regardless, we *should* still 
schedule on the all
    // the other resources by this point
    clock.advance(2000)
    // this would pass if we advanced the clock by this much instead
//    clock.advance(processWait + nodeWait + 10)
    logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}")
    execs.foreach { case (exec, host) =>
      if (exec != "exec0") {
        withClue(s"trying to schedule on $exec:$host at time 
${clock.getTimeMillis()}") {
          assert(manager.resourceOffer(execId = exec, host = host, 
ANY).isDefined)
        }
      }
    }
  }
{code}

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18886
>                 URL: https://issues.apache.org/jira/browse/SPARK-18886
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to