[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762296#comment-15762296
 ] 

Imran Rashid commented on SPARK-18886:
--------------------------------------

Thanks [~mridul], that helps -- in particular I was only thinking about bulk 
scheduling, I had forgotten to take that into account.

After a closer look through the code, I think my earlier proposal makes sense 
-- rather than resetting the timeout as each task is scheduled, change it to 
start the timer as soon as there is an offer which goes unused due to the 
delay.  Once that timer is started, it is never reset (for that TSM).

I can think of one scenario where this would result in worse scheduling than 
what we currently have.  Suppose that initially, a TSM is offered one resource 
which only matches on rack_local.  But immediately after that, many 
process_local offers are made, which are all used up.  Some time later, more 
offers that are only rack_local come in.  They'll immediately get used, even 
though there may be plenty more offers that are process_local that are just 
about to come in (perhaps enough for all of the remaining tasks).

That wouldn't be great, but its also not nearly as bad as letting most of your 
cluster sit idle.

Other alternatives I can think of:

a) Turn off delay scheduling by default, and change 
[{{TaskSchedulerImpl.resourceOffer}}|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L357-L360]
 to go through all task sets, then advance locality levels, rather than the 
other way around.  Perhaps we should invert those loops anyway, just for when 
users turn off delay scheduling.

b) Have TSM use some knowledge about all available executors to decide whether 
or not it is even possible for enough resources at the right locality level to 
appear.  Eg., in the original case, the TSM would realize there is only one 
executor which is process_local, so it doesn't make sense to wait to schedule 
all tasks on that executor.  However, I'm pretty skeptical about doing anything 
like this, as it may be a somewhat complicated thing inside the scheduler, and 
it could just turn into a mess of heuristics which has lots of corner cases.

I think implementing my proposed solution should be relatively easy, so I'll 
take a stab at it, but I'd still appreciate more input on the right approach 
here.  Perhaps seeing an implementation will make it easier to discuss.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18886
>                 URL: https://issues.apache.org/jira/browse/SPARK-18886
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to