[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931009#comment-15931009
 ] 

Kay Ousterhout commented on SPARK-18886:
----------------------------------------

Sorry for the slow response here!  I realized this is the same issue as 
SPARK-11460 (although that JIRA proposed a slightly different solution), which 
stalled for reasons that are completely my fault (I neglected it because I 
couldn't think of a practical way of solving it).

Imran, unfortunately I don't think your latest idea will quite work.  Delay 
scheduling was originally intended for situations where the number of slots 
that a particular job could use was limited by a fairness policy.  In that 
case, it can be better to wait a bit for a "better" slot (i.e., one that 
satisfies locality preferences).  In particular, if you never wait, you end up 
with this "sticky slot" issue where tasks for a job keep finishing up in a 
"bad" slot (one with no locality preferences), and then they'll be re-offered 
to the same job, which will again accept the bad slot.  If the job just waited 
a bit, it could get a better slot (e.g., as a result of tasks from another job 
finishing). [1]

This relates to your idea because of the following situation: suppose you have 
a cluster with 10 machines, the job has locality preferences for 5 of them 
(with ids 1, 2, 3, 4, 5), and fairness dictates that the job can only use 3 
slots at a time (e.g., it's sharing equally with 2 other jobs).  Suppose that 
for a long time, the job has been running tasks on slots 1, 2, and 3 (so local 
slots).  At this point, the times for machines 6, 7, 8, 9, and 10 will have 
expired, because the job has been running for a while.  But if the job is now 
offered a slot on one of those non-local machines (e.g., 6), the job hasn't 
been waiting long for non-local resources: until this point, it's been running 
it's full share of 3 slots at a time, and it's been doing so on machines that 
satisfy locality preferences.  So, we shouldn't accept that slot on machine 6 
-- we should wait a bit to see if we can get a slot on 1, 2, 3, 4, or 5.

The solution I proposed (in a long PR comment) for the other JIRA is: if the 
task set is using fewer than the number of slots it could be using (where “# 
slots it could be using” is all of the slots in the cluster if the job is 
running alone, or the job’s fair share, if it’s not) for some period of time, 
increase the locality level.   The problem with that solution is that I thought 
it was completely impractical to determine the number of slots a TSM "should" 
be allowed to use.

However, after thinking about this more today, I think we might be able to do 
this in a practical way:
- First, I thought that we could use information about when offers are rejected 
to determine this (e.g., if you've been rejecting offers for a while, then 
you're not using your fair share).  But the problem here is that it's not easy 
to determine when you *are* using your fair / allowed share: accepting a single 
offer doesn't necessarily mean that you're now using the allowed share.  This 
is precisely the problem with the current approach, hence this JIRA.
- v1: one possible proxy for this is if there are slots that are currently 
available that haven't been accepted by any job.  The TaskSchedulerImpl could 
feasibly pass this information to each TaskSetManager, and the TSM could use it 
to update it's delay timer: something like only reset the delay timer to 0 if 
(a) the TSM accepts an offer and (b) the flag passed by the TSM indicates that 
there are no other unused slots in the cluster.  This fixes the problem 
described in the JIRA: in that case, the flag would indicate that there *were* 
other unused slots, even though a task got successfully scheduled with this 
offer, so the delay timer wouldn't be reset, and would eventually correctly 
expire.
- v2: The problem with v1 is that it doesn't correctly handle situations where 
e.g., you have two jobs A and B with equal shares.  B is "greedy" and will 
accept any slot (e.g., it's a reduce stage), and A is doing delay scheduling.  
In this case, A might have much less than its share, but the flag from the 
TaskSchedulerImpl would indicate that there were no other free slots in the 
cluster, so the delay timer wouldn't ever expire.  I suspect we could handle 
this (e.g., with some logic in the TaskSchedulerImpl to detect when a 
particular TSM is getting starved: when it keeps rejecting offers that are 
later accepted by someone else) but before thinking about this further, I 
wanted to run the general idea by you to see what your thoughts are.

[1] There's a whole side question / discussion of how often this is useful for 
Spark at all.  It can be useful if you're running in a shared cluster where 
e.g. Yarn might be assigning you more slots over time, and it's also useful 
when a single Spark context is being shared across many jobs.  But often for 
Spark, you have one job running alone, in which case delay scheduling should 
arguably be turned of altogether, as you suggested earlier Imran.  But let's 
separate that discussion from this one, of how to make it work better.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18886
>                 URL: https://issues.apache.org/jira/browse/SPARK-18886
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to