[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2019-12-04 Thread Nicholas Brett Marcott (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987633#comment-16987633
 ] 

Nicholas Brett Marcott commented on SPARK-18886:


Thanks for mentioning the PRs here.

My proposed solution in the second [PR mentioned 
above|https://github.com/apache/spark/pull/26696] is what I believe Kay said 
was ideal in comments of this [PR|https://github.com/apache/spark/pull/9433], 
but seemed to think was impractical. 

*The proposed solution:*

Currently the time window that locality wait times are measuring is the time 
since the last task launched for a TSM. The proposed change is to instead 
measure the time since this TSM's available slots were fully utilized.

The number of available slots for a TSM can be determined by dividing all slots 
among the TSMs according to the scheduling policy (FIFO vs FAIR).

*Other possible solutions and their issues:*
 # Never reset timer: delay scheduling would likely only work on first wave*
 # Per slot timer: delay scheduling should apply per task/taskset, otherwise, 
timers started by one taskset could cause delay scheduling to be ignored for 
the next taskset,  which might lead you to try approach #3
 # Per slot per stage timer: tasks can be starved by being offered unique slots 
over a period of time. Possibly a taskset or other job that doesn't care about 
locality would use those resources.  Also too many timers/bookkeeping
 # Per task timer: you still need a way to distinguish between when a task is 
waiting for a slot to become available vs it has them available but is not 
utilizing them (which is what this PR does). To do this right seems to be this 
PR + more timers.

 

*wave = one round of running as many tasks as there are available slots for a 
taskset. imagine you have 2 slots and 10 tasks. it would take 10 / 2 = 5 waves 
to complete the taskset

 

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>Priority: Major
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2019-12-03 Thread Thomas Graves (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987037#comment-16987037
 ] 

Thomas Graves commented on SPARK-18886:
---

Note there is discussion on this subject on prs:

[https://github.com/apache/spark/pull/26633] (hack to work around it for a 
particular RDD)

PR with proposed solution - but really more discussion solution:

[https://github.com/apache/spark/pull/26696]

 

My proposal I believe is similar to Kay's where we use slots and track the 
delay per slot.  I haven't looked at the code in specific detail, especially in 
the FairScheduler where most of the issues in the conversations above were 
mentioned.  One way around this is we have different policies and allow users 
to configure, or have one for FairScheduler and one for fifo.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>Priority: Major
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2017-10-30 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225412#comment-16225412
 ] 

Imran Rashid commented on SPARK-18886:
--

Hi [~willshen],

I haven't looked closely, but I would expect this to also exist in 1.6.0.  Btw, 
due to SPARK-18967 (which I assume is also in 1.6.0, but again havne't looked 
closely), I would advise setting the wait to {{1ms}}, not {{0}}.


> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2017-10-30 Thread William Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225335#comment-16225335
 ] 

William Shen commented on SPARK-18886:
--

[~imranr],

I came across this issue because it is marked as duplicated by SPARK-11460. 
SPARK-11460 has affected versions of 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 
1.2.1, 1.2.2, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, and this issue has 
affected version of 2.1.0. Do we know if this is also an issue for 1.6.0? I am 
observing similar behavior for our application in 1.6.0. We are also able to 
achieve better utilization of the executors and performance through setting the 
wait to 0.

Thank you

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2017-03-19 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931893#comment-15931893
 ] 

Imran Rashid commented on SPARK-18886:
--

Thanks Kay for the full description (and finding the old jira, sorry I didn't 
notice the duplicate).  Your explanation and alternative makes sense.  One 
detail from v1:

bq. flag passed by the TSM indicates that there are no other unused slots in 
the cluster

neither the TSM nor TaskSchedulerImpl currently track this -- they know about 
executors, but not individual slots.  With bulk-scheduling calls to 
{{resourceOffer()}} that include the entire set of slots that isn't a problem, 
but it is for single offers.  Anyway, its still solvable, just more bookkeeping 
and more complex change.

bq. But often for Spark, you have one job running alone, in which case delay 
scheduling should arguably be turned of altogether, as you suggested earlier 
Imran. But let's separate that discussion from this one, of how to make it work 
better.

yeah, you can see that earlier in the thread I was trying to figure out what 
the purpose of this was anyway ... I am going to be recommending folks to turn 
it off more often.  But even when you have just one job running at a time, this 
still matters for jobs with parallel stages in the DAG, eg. a join.  Fairness 
doesn't matter at all between the stages, but overall efficiency does.  If you 
turn delay scheduling off entirely, then whichever taskset comes first will get 
all the resources, rather than giving both a shot at local resources.  So I 
feel like the right recommendation is {{1ms}}.  There is probably something 
else to fix and another jira here though I don't have a clear idea around it 
yet.

I will keep thinking about your v2.  What you are proposing makes sense, but I 
worry that we continue to band-aid these situations where things are really 
bad, but we're still stuck with a system where the delay should really be 
closely tuned to the task length, otherwise there is a lot of inefficiency.  
This wasted time isn't even tracked anywhere (its not included in "scheduler 
delay"), so users have no idea their hitting this.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2017-03-17 Thread Kay Ousterhout (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931009#comment-15931009
 ] 

Kay Ousterhout commented on SPARK-18886:


Sorry for the slow response here!  I realized this is the same issue as 
SPARK-11460 (although that JIRA proposed a slightly different solution), which 
stalled for reasons that are completely my fault (I neglected it because I 
couldn't think of a practical way of solving it).

Imran, unfortunately I don't think your latest idea will quite work.  Delay 
scheduling was originally intended for situations where the number of slots 
that a particular job could use was limited by a fairness policy.  In that 
case, it can be better to wait a bit for a "better" slot (i.e., one that 
satisfies locality preferences).  In particular, if you never wait, you end up 
with this "sticky slot" issue where tasks for a job keep finishing up in a 
"bad" slot (one with no locality preferences), and then they'll be re-offered 
to the same job, which will again accept the bad slot.  If the job just waited 
a bit, it could get a better slot (e.g., as a result of tasks from another job 
finishing). [1]

This relates to your idea because of the following situation: suppose you have 
a cluster with 10 machines, the job has locality preferences for 5 of them 
(with ids 1, 2, 3, 4, 5), and fairness dictates that the job can only use 3 
slots at a time (e.g., it's sharing equally with 2 other jobs).  Suppose that 
for a long time, the job has been running tasks on slots 1, 2, and 3 (so local 
slots).  At this point, the times for machines 6, 7, 8, 9, and 10 will have 
expired, because the job has been running for a while.  But if the job is now 
offered a slot on one of those non-local machines (e.g., 6), the job hasn't 
been waiting long for non-local resources: until this point, it's been running 
it's full share of 3 slots at a time, and it's been doing so on machines that 
satisfy locality preferences.  So, we shouldn't accept that slot on machine 6 
-- we should wait a bit to see if we can get a slot on 1, 2, 3, 4, or 5.

The solution I proposed (in a long PR comment) for the other JIRA is: if the 
task set is using fewer than the number of slots it could be using (where “# 
slots it could be using” is all of the slots in the cluster if the job is 
running alone, or the job’s fair share, if it’s not) for some period of time, 
increase the locality level.   The problem with that solution is that I thought 
it was completely impractical to determine the number of slots a TSM "should" 
be allowed to use.

However, after thinking about this more today, I think we might be able to do 
this in a practical way:
- First, I thought that we could use information about when offers are rejected 
to determine this (e.g., if you've been rejecting offers for a while, then 
you're not using your fair share).  But the problem here is that it's not easy 
to determine when you *are* using your fair / allowed share: accepting a single 
offer doesn't necessarily mean that you're now using the allowed share.  This 
is precisely the problem with the current approach, hence this JIRA.
- v1: one possible proxy for this is if there are slots that are currently 
available that haven't been accepted by any job.  The TaskSchedulerImpl could 
feasibly pass this information to each TaskSetManager, and the TSM could use it 
to update it's delay timer: something like only reset the delay timer to 0 if 
(a) the TSM accepts an offer and (b) the flag passed by the TSM indicates that 
there are no other unused slots in the cluster.  This fixes the problem 
described in the JIRA: in that case, the flag would indicate that there *were* 
other unused slots, even though a task got successfully scheduled with this 
offer, so the delay timer wouldn't be reset, and would eventually correctly 
expire.
- v2: The problem with v1 is that it doesn't correctly handle situations where 
e.g., you have two jobs A and B with equal shares.  B is "greedy" and will 
accept any slot (e.g., it's a reduce stage), and A is doing delay scheduling.  
In this case, A might have much less than its share, but the flag from the 
TaskSchedulerImpl would indicate that there were no other free slots in the 
cluster, so the delay timer wouldn't ever expire.  I suspect we could handle 
this (e.g., with some logic in the TaskSchedulerImpl to detect when a 
particular TSM is getting starved: when it keeps rejecting offers that are 
later accepted by someone else) but before thinking about this further, I 
wanted to run the general idea by you to see what your thoughts are.

[1] There's a whole side question / discussion of how often this is useful for 
Spark at all.  It can be useful if you're running in a shared cluster where 
e.g. Yarn might be assigning you more slots over time, and it's also useful 
when a single Spark context is being shared across many 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2017-01-19 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15830530#comment-15830530
 ] 

Imran Rashid commented on SPARK-18886:
--

I had another idea for how to fix this.  In addition to tracking the last time 
any task was launched, TaskScheduler also tracks the last time it didn't 
schedule anything due to locality constraints, on *each resource*.  Then when a 
new offer comes in, you are allowed to schedule if either the overall locality 
timer is up, or if the timer is up for that particular resource.

On the plus side -- I think this keeps all the properties we want.  You avoid 
an indefinite delay just because *one* resource is local;  but you also keep 
the delay if those resources get used up by another task set.

The downside -- significantly more complex.  It adds to the memory usage of 
TaskScheduler (though in the scheme of things, pretty nominal increase), but it 
will also make the code significantly more complicated.

Aside: There is also weird relationship between taskset priority, and locality 
scheduling.  Assuming all tasksets have cleared their locality wait timeouts, 
then we favor taskset priority over locality.  But if the tasksets haven't 
cleared those timeouts, then things get strange.  It really depends on what the 
current locality levels are in each taskset.  In the simple case, you end up 
favoring locality, by limiting the max Locality of each taskset.  A very low 
priority taskset easily "steals" the resources from a high priority one if it 
doesn't have locality preferences.  We should probably figure out what the 
desired behavior is so we can make it a little more consistent (or at least 
document it).

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".  *NOTE*: due to SPARK-18967, avoid 
> setting the {{spark.locality.wait=0}} -- instead, use 
> {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-20 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15765322#comment-15765322
 ] 

Imran Rashid commented on SPARK-18886:
--

You understand correctly -- that is precisely what I'm proposing.

The scenario with multiple waves is a good example for why I think this is a 
*good* change.  If only 1% of your cluster can take advantage of locality, then 
99% of your cluster goes unused across all those waves.  That may be an extreme 
(though a case I have actually seen in practice on large clusters).  even if 
its 50%, then you have 50% of your cluster going unused.  Unless local tasks 
are more than 2x faster, it would make more sense to make the change I'm 
proposing.  What's the worst case after this change?  All but one executor are 
local -- the result is that you have one task running slower.  But the more 
waves there are, the less the downside.  Eg., you complete 10 waves on the 
local executors, and only 8 waves on the non-local one.

The worst case is if there is only one wave, there is a huge gap (multiples Xs) 
in runtime between local and non-local execution, and moments after you 
schedule on non-local resources, some local resource would become available.  I 
think this situation is not very common -- in particular, there normally isn't 
*such* an enormous gap between local and non-local that users would prefer 
their non-local resources sit idle indefinitely.  I'd argue that if such a use 
case is important, we should add a special conf for that in particular.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-20 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764876#comment-15764876
 ] 

Mridul Muralidharan commented on SPARK-18886:
-

I am not sure what is described will work as expected [~imranr].
Consider a taskset which has number of tasks as many multiples of number of 
executors (fairly common scenario).
In this case, if the timer is never reset, you will effectively make delay to 0 
once it expires, across all waves for the taskset.
(I am assuming I understood the proposal right).

[~kayousterhout] and [~markhamstra] might have more comments though - in case I 
am missing something here.


> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-20 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764795#comment-15764795
 ] 

Apache Spark commented on SPARK-18886:
--

User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/16354

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-19 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762296#comment-15762296
 ] 

Imran Rashid commented on SPARK-18886:
--

Thanks [~mridul], that helps -- in particular I was only thinking about bulk 
scheduling, I had forgotten to take that into account.

After a closer look through the code, I think my earlier proposal makes sense 
-- rather than resetting the timeout as each task is scheduled, change it to 
start the timer as soon as there is an offer which goes unused due to the 
delay.  Once that timer is started, it is never reset (for that TSM).

I can think of one scenario where this would result in worse scheduling than 
what we currently have.  Suppose that initially, a TSM is offered one resource 
which only matches on rack_local.  But immediately after that, many 
process_local offers are made, which are all used up.  Some time later, more 
offers that are only rack_local come in.  They'll immediately get used, even 
though there may be plenty more offers that are process_local that are just 
about to come in (perhaps enough for all of the remaining tasks).

That wouldn't be great, but its also not nearly as bad as letting most of your 
cluster sit idle.

Other alternatives I can think of:

a) Turn off delay scheduling by default, and change 
[{{TaskSchedulerImpl.resourceOffer}}|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L357-L360]
 to go through all task sets, then advance locality levels, rather than the 
other way around.  Perhaps we should invert those loops anyway, just for when 
users turn off delay scheduling.

b) Have TSM use some knowledge about all available executors to decide whether 
or not it is even possible for enough resources at the right locality level to 
appear.  Eg., in the original case, the TSM would realize there is only one 
executor which is process_local, so it doesn't make sense to wait to schedule 
all tasks on that executor.  However, I'm pretty skeptical about doing anything 
like this, as it may be a somewhat complicated thing inside the scheduler, and 
it could just turn into a mess of heuristics which has lots of corner cases.

I think implementing my proposed solution should be relatively easy, so I'll 
take a stab at it, but I'd still appreciate more input on the right approach 
here.  Perhaps seeing an implementation will make it easier to discuss.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15756179#comment-15756179
 ] 

Mridul Muralidharan commented on SPARK-18886:
-


- Delay 'using up' all resources - another task/taskset which has better 
locality preference might be available for that executor (also, see speculative 
exec impact)
- A delay would cause a better locality preference to become available for the 
task. Suboptimal schedule has a cascading effect on rest of the executors, 
application and cluster.
- Note that not all executors are available at the same time in resourceOffer : 
you have bulk reschedule periodically,  sporadic reschedules when tasks finish 
and periodic bulk speculative schedule updates.


> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752722#comment-15752722
 ] 

Imran Rashid commented on SPARK-18886:
--

[~mridul] sorry if I am being slow here, but do you mind spelling out for me in 
more detail?  I'm *not* asking about the benefits of using locality preferences 
-- I get that part.  I'm asking about why the *delay*.  There has to be 
something happening during the delay which we want to wait for.

One possibility is that you've got multiple tasksets running concurrently, with 
different locality preferences.  You wouldn't want the first taskset to use all 
the resources, you'd rather take both tasksets into account.  This is 
accomplished with delay scheduling, but you don't actually *need* the delay.

Another possibility is that there is such a huge gap in runtime that you expect 
your preferred locations will finish *all* tasks in the taskset before that 
delay is up, by having some executors run multiple tasks.

The reason I'm trying to figure this out is to figure out if there is a 
sensible fix here (and what the smallest possible fix would be).  If this is 
it, then the fix I suggested above to Mark should handle this case, while still 
working as intended in other cases.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752579#comment-15752579
 ] 

Mridul Muralidharan commented on SPARK-18886:
-


[~imranr] For almost all cases, delay scheduling dramatically increases 
performance. The difference even between PROCESS and NODE is significantly high 
(between NODE and 'lower' levels, it can depend on your network config).
For both tasks with short duration and tasks processing large amounts of data, 
it has non trivial impact : long tasks processing small data, it is not so 
useful in comparison iirc, same for degenerate cases where locality preference 
is suboptimal to begin with. [As an aside, the ability to not specify PROCESS 
level locality actually is a drawback in our api]

The job(s) I mentioned where we set it to 0 were special cases, where we knew 
the costs well enough to make the decision to lower it : but I would not 
recommend it unless users are very sure of what they are doing. While analysing 
the cost, it should also be kept in mind that transferring data across nodes 
impacts not just spark job, but every other job in the cluster.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752462#comment-15752462
 ] 

Imran Rashid commented on SPARK-18886:
--

[~mridulm80] good point, perhaps the right answer here is just to turn off 
delay scheduling completely -- not setting {{"spark.locality.wait.process"}} to 
a small value, as I had suggested in the initial workaround, but just turning 
it off completely, to avoid having to futz with tuning that value relative to 
task runtime.

But lemme ask you more or less the same question I just asked mark, phrased a 
little differently -- given the fragility of this, wouldn't it make more sense 
for us to turn delay scheduling *off* by default?

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.wait.process" to your desired wait interval, and 
> set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  
> For example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752451#comment-15752451
 ] 

Imran Rashid commented on SPARK-18886:
--

[~markhamstra] I'm not really sure yet, I can't really decide exactly what the 
right behavior should be.  My initial thought was that the TSM should track the 
last time that anything has been scheduled at *each* locality level, rather 
than just one overall {{lastLaunchTime}}, so that it realizes that some 
resources have been waiting a long time at a higher locality level.  But I 
wasn't exactly sure what should happen after you schedule one task at a higher 
locality level.  Do you reset the timer?  Or do you just keep scheduling at the 
locality level for the rest of the task set?  If you reset the timer, than you 
will only schedule one task on the other resources, before adding another 
locality delay, so that doesn't work.  But if you keep scheduling at that new 
locality level, then you've thrown away delay scheduling for the rest of the 
task set.  Is that OK?

To put that last question a different way -- what is the point of delay 
scheduling anyway?  What are we hoping will happen in that delay window?
1) the tasks run so fast that the preferred localities make it through all of 
the tasks in the entire task set before the delay is up
2) other tasksets are concurrently submitted with different locality 
preferences, so we can submit those tasks to the remaining executors (rather 
than sitting idle)
3) new resources will be spun up with the desired locality preferences.  (Eg., 
we've requested a bunch of resources from dynamic allocation, and the resources 
which have become available so far don't have the preferred localities, but 
more are still getting spun up.)

Under all the scenarios I can think of, you might as well turn off delay 
scheduling for the rest of the taskset.  But to be honest I don't feel like 
I've got good justification for delay scheduling in the first place, so I feel 
like I may be missing something.


Also I did a bit more digging into the case I had, and its happening because of 
a repartition from a small number (~10) of partitions to a much larger one.  
The shuffle map stage ends up running two tasks on one host, and with a very 
small amount of skew, it turns out that one host has > 20% of the shuffle 
output, while none of the other hosts do.   I feel like there is also something 
else we can do here to improve the shuffle locality preferences, but I don't 
have any concrete ideas on what that improvement should be.

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752426#comment-15752426
 ] 

Mridul Muralidharan commented on SPARK-18886:
-


Spark scheduler can be suboptimal for a lot of degenerate cases, not just this. 
But for the most part it does quite well : and for others, you can simply 
change the config to better suit the workload better - for example, I have used 
0 for the delay timeout in some production jobs actually - you get prioritized 
scheduling, and fallback to ANY quickly when all tasks with preference have 
been scheduled to remove any waiting : when benefits of colocation are trumped 
by needing quicker schedules.

That is not to say we cant improve scheduling in spark - for example, treat it 
as a bin packing problem for schedules of a single (or across) locality 
preference to 'pack' more tasks - doing it without incurring high cost is why 
lot of these did not make it in unfortunately.


> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially 
> if you do a repartition starting from a small number of partitions.  (Shuffle 
> locality preference is assigned if any node has more than 20% of the shuffle 
> input data -- by chance, you may have one node just above that threshold, and 
> all other nodes just below it.)  In this case, you can turn off locality 
> preference for shuffle data by setting 
> {{spark.shuffle.reduceLocality.enabled=false}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Mark Hamstra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752098#comment-15752098
 ] 

Mark Hamstra commented on SPARK-18886:
--

That's a great explanation of the issue, and nice example code, [~imranr].  I'm 
sure that I have seen this kind of excessive Task stickiness with many 
quick-to-execute Tasks, but I never got to the level of diagnosing the problem 
that you have.

Your listed workarounds, while interesting, aren't a complete long-term 
solution, of course.  Have you thought at all yet about possible paths to a 
solution?  One idea that comes to my mind is that speculative execution has at 
least the potential to get the delayed Tasks executed more quickly elsewhere -- 
but our prior concerns or lack of confidence with speculative execution remain. 
 

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15751956#comment-15751956
 ] 

Imran Rashid commented on SPARK-18886:
--

Here's a failing test case: (you can check it out directly here: 
https://github.com/squito/spark/tree/delay_sched-SPARK-18886)

{code}
  test("Delay scheduling checks utilization at each locality level") {
// Create a cluster with 100 executors, and submit 100 tasks, but each task 
would prefer to
// be on the same node in the cluster.  We should not wait to schedule each 
task on the one
// executor.
val conf = new SparkConf().set("spark.locality.wait", "1s")
sc = new SparkContext("local", "test", conf)
val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x", 
s"host$x") }
val sched = new FakeTaskScheduler(sc, execs: _*)
val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ =>
  Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*)
val clock = new ManualClock
val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock)
logInfo("initial locality levels = " + 
manager.myLocalityLevels.mkString(","))
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
// initially, the locality preferences should lead us to only schedule 
tasks on one executor
logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}")
val firstScheduledTask = execs.flatMap { case (exec, host) =>
  val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
  assert(schedTaskOpt.isDefined === (exec == "exec0"))
  schedTaskOpt
}.head

// without advancing the clock, no matter how many times we make offers on 
the *other*
// executors, nothing should get scheduled
(0 until 50).foreach { _ =>
  execs.foreach { case (exec, host) =>
if (exec != "exec0") {
  assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty)
}
  }
}

// now we advance the clock till just *before* the locality delay is up, 
and we finish the first
// task
val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process", 
"3s")
val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s")
clock.advance(processWait + nodeWait - 1)
logInfo(s"finishing first task at ${clock.getTimeMillis()}")
manager.handleSuccessfulTask(firstScheduledTask.taskId,
  createTaskResult(firstScheduledTask.index))
// if we offer all the resources again, still we should only schedule on 
one executor
logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}")
val secondScheduledTask = execs.flatMap { case (exec, host) =>
  val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY)
  assert(schedTaskOpt.isDefined === (exec == "exec0"))
  schedTaskOpt
}.head

// Now lets advance the clock further, so that all of our other executors 
have been sitting
// idle for longer than the locality wait time.  We have managed to 
schedule *something* at a
// lower locality level within the time, but regardless, we *should* still 
schedule on the all
// the other resources by this point
clock.advance(2000)
// this would pass if we advanced the clock by this much instead
//clock.advance(processWait + nodeWait + 10)
logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}")
execs.foreach { case (exec, host) =>
  if (exec != "exec0") {
withClue(s"trying to schedule on $exec:$host at time 
${clock.getTimeMillis()}") {
  assert(manager.resourceOffer(execId = exec, host = host, 
ANY).isDefined)
}
  }
}
  }
{code}

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host 

[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout

2016-12-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15751949#comment-15751949
 ] 

Imran Rashid commented on SPARK-18886:
--

cc [~kayousterhout] [~markhamstra] [~zsxwing]

> Delay scheduling should not delay some executors indefinitely if one task is 
> scheduled before delay timeout
> ---
>
> Key: SPARK-18886
> URL: https://issues.apache.org/jira/browse/SPARK-18886
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> Delay scheduling can introduce an unbounded delay and underutilization of 
> cluster resources under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, 
> spark waits indefinitely.
> As an example, consider a cluster with 100 executors, and a taskset with 500 
> tasks.  Say all tasks have a preference for one executor, which is by itself 
> on one host.  Given the default locality wait of 3s per level, we end up with 
> a 6s delay till we schedule on other hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks 
> get scheduled on _only one_ executor.  This means you're only using a 1% of 
> your cluster, and you get a ~100x slowdown.  You'd actually be better off if 
> tasks took 7 seconds.
> *WORKAROUNDS*: 
> (1) You can change the locality wait times so that it is shorter than the 
> task execution time.  You need to take into account the sum of all wait times 
> to use all the resources on your cluster.  For example, if you have resources 
> on different racks, this will include the sum of 
> "spark.locality.wait.process" + "spark.locality.wait.node" + 
> "spark.locality.wait.rack".  Those each default to "3s".  The simplest way to 
> be to set "spark.locality.process" to your desired wait interval, and set 
> both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0".  For 
> example, if your tasks take ~3 seconds on average, you might set 
> "spark.locality.wait.process" to "1s".
> Note that this workaround isn't perfect --with less delay scheduling, you may 
> not get as good resource locality.  After this issue is fixed, you'd most 
> likely want to undo these configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in 
> their locality preferences.  Users may be able to modify their job to 
> controlling the distribution of the original input data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org