[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987633#comment-16987633 ] Nicholas Brett Marcott commented on SPARK-18886: Thanks for mentioning the PRs here. My proposed solution in the second [PR mentioned above|https://github.com/apache/spark/pull/26696] is what I believe Kay said was ideal in comments of this [PR|https://github.com/apache/spark/pull/9433], but seemed to think was impractical. *The proposed solution:* Currently the time window that locality wait times are measuring is the time since the last task launched for a TSM. The proposed change is to instead measure the time since this TSM's available slots were fully utilized. The number of available slots for a TSM can be determined by dividing all slots among the TSMs according to the scheduling policy (FIFO vs FAIR). *Other possible solutions and their issues:* # Never reset timer: delay scheduling would likely only work on first wave* # Per slot timer: delay scheduling should apply per task/taskset, otherwise, timers started by one taskset could cause delay scheduling to be ignored for the next taskset, which might lead you to try approach #3 # Per slot per stage timer: tasks can be starved by being offered unique slots over a period of time. Possibly a taskset or other job that doesn't care about locality would use those resources. Also too many timers/bookkeeping # Per task timer: you still need a way to distinguish between when a task is waiting for a slot to become available vs it has them available but is not utilizing them (which is what this PR does). To do this right seems to be this PR + more timers. *wave = one round of running as many tasks as there are available slots for a taskset. imagine you have 2 slots and 10 tasks. it would take 10 / 2 = 5 waves to complete the taskset > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid >Priority: Major > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987037#comment-16987037 ] Thomas Graves commented on SPARK-18886: --- Note there is discussion on this subject on prs: [https://github.com/apache/spark/pull/26633] (hack to work around it for a particular RDD) PR with proposed solution - but really more discussion solution: [https://github.com/apache/spark/pull/26696] My proposal I believe is similar to Kay's where we use slots and track the delay per slot. I haven't looked at the code in specific detail, especially in the FairScheduler where most of the issues in the conversations above were mentioned. One way around this is we have different policies and allow users to configure, or have one for FairScheduler and one for fifo. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid >Priority: Major > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225412#comment-16225412 ] Imran Rashid commented on SPARK-18886: -- Hi [~willshen], I haven't looked closely, but I would expect this to also exist in 1.6.0. Btw, due to SPARK-18967 (which I assume is also in 1.6.0, but again havne't looked closely), I would advise setting the wait to {{1ms}}, not {{0}}. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225335#comment-16225335 ] William Shen commented on SPARK-18886: -- [~imranr], I came across this issue because it is marked as duplicated by SPARK-11460. SPARK-11460 has affected versions of 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.2.1, 1.2.2, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, and this issue has affected version of 2.1.0. Do we know if this is also an issue for 1.6.0? I am observing similar behavior for our application in 1.6.0. We are also able to achieve better utilization of the executors and performance through setting the wait to 0. Thank you > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931893#comment-15931893 ] Imran Rashid commented on SPARK-18886: -- Thanks Kay for the full description (and finding the old jira, sorry I didn't notice the duplicate). Your explanation and alternative makes sense. One detail from v1: bq. flag passed by the TSM indicates that there are no other unused slots in the cluster neither the TSM nor TaskSchedulerImpl currently track this -- they know about executors, but not individual slots. With bulk-scheduling calls to {{resourceOffer()}} that include the entire set of slots that isn't a problem, but it is for single offers. Anyway, its still solvable, just more bookkeeping and more complex change. bq. But often for Spark, you have one job running alone, in which case delay scheduling should arguably be turned of altogether, as you suggested earlier Imran. But let's separate that discussion from this one, of how to make it work better. yeah, you can see that earlier in the thread I was trying to figure out what the purpose of this was anyway ... I am going to be recommending folks to turn it off more often. But even when you have just one job running at a time, this still matters for jobs with parallel stages in the DAG, eg. a join. Fairness doesn't matter at all between the stages, but overall efficiency does. If you turn delay scheduling off entirely, then whichever taskset comes first will get all the resources, rather than giving both a shot at local resources. So I feel like the right recommendation is {{1ms}}. There is probably something else to fix and another jira here though I don't have a clear idea around it yet. I will keep thinking about your v2. What you are proposing makes sense, but I worry that we continue to band-aid these situations where things are really bad, but we're still stuck with a system where the delay should really be closely tuned to the task length, otherwise there is a lot of inefficiency. This wasted time isn't even tracked anywhere (its not included in "scheduler delay"), so users have no idea their hitting this. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931009#comment-15931009 ] Kay Ousterhout commented on SPARK-18886: Sorry for the slow response here! I realized this is the same issue as SPARK-11460 (although that JIRA proposed a slightly different solution), which stalled for reasons that are completely my fault (I neglected it because I couldn't think of a practical way of solving it). Imran, unfortunately I don't think your latest idea will quite work. Delay scheduling was originally intended for situations where the number of slots that a particular job could use was limited by a fairness policy. In that case, it can be better to wait a bit for a "better" slot (i.e., one that satisfies locality preferences). In particular, if you never wait, you end up with this "sticky slot" issue where tasks for a job keep finishing up in a "bad" slot (one with no locality preferences), and then they'll be re-offered to the same job, which will again accept the bad slot. If the job just waited a bit, it could get a better slot (e.g., as a result of tasks from another job finishing). [1] This relates to your idea because of the following situation: suppose you have a cluster with 10 machines, the job has locality preferences for 5 of them (with ids 1, 2, 3, 4, 5), and fairness dictates that the job can only use 3 slots at a time (e.g., it's sharing equally with 2 other jobs). Suppose that for a long time, the job has been running tasks on slots 1, 2, and 3 (so local slots). At this point, the times for machines 6, 7, 8, 9, and 10 will have expired, because the job has been running for a while. But if the job is now offered a slot on one of those non-local machines (e.g., 6), the job hasn't been waiting long for non-local resources: until this point, it's been running it's full share of 3 slots at a time, and it's been doing so on machines that satisfy locality preferences. So, we shouldn't accept that slot on machine 6 -- we should wait a bit to see if we can get a slot on 1, 2, 3, 4, or 5. The solution I proposed (in a long PR comment) for the other JIRA is: if the task set is using fewer than the number of slots it could be using (where “# slots it could be using” is all of the slots in the cluster if the job is running alone, or the job’s fair share, if it’s not) for some period of time, increase the locality level. The problem with that solution is that I thought it was completely impractical to determine the number of slots a TSM "should" be allowed to use. However, after thinking about this more today, I think we might be able to do this in a practical way: - First, I thought that we could use information about when offers are rejected to determine this (e.g., if you've been rejecting offers for a while, then you're not using your fair share). But the problem here is that it's not easy to determine when you *are* using your fair / allowed share: accepting a single offer doesn't necessarily mean that you're now using the allowed share. This is precisely the problem with the current approach, hence this JIRA. - v1: one possible proxy for this is if there are slots that are currently available that haven't been accepted by any job. The TaskSchedulerImpl could feasibly pass this information to each TaskSetManager, and the TSM could use it to update it's delay timer: something like only reset the delay timer to 0 if (a) the TSM accepts an offer and (b) the flag passed by the TSM indicates that there are no other unused slots in the cluster. This fixes the problem described in the JIRA: in that case, the flag would indicate that there *were* other unused slots, even though a task got successfully scheduled with this offer, so the delay timer wouldn't be reset, and would eventually correctly expire. - v2: The problem with v1 is that it doesn't correctly handle situations where e.g., you have two jobs A and B with equal shares. B is "greedy" and will accept any slot (e.g., it's a reduce stage), and A is doing delay scheduling. In this case, A might have much less than its share, but the flag from the TaskSchedulerImpl would indicate that there were no other free slots in the cluster, so the delay timer wouldn't ever expire. I suspect we could handle this (e.g., with some logic in the TaskSchedulerImpl to detect when a particular TSM is getting starved: when it keeps rejecting offers that are later accepted by someone else) but before thinking about this further, I wanted to run the general idea by you to see what your thoughts are. [1] There's a whole side question / discussion of how often this is useful for Spark at all. It can be useful if you're running in a shared cluster where e.g. Yarn might be assigning you more slots over time, and it's also useful when a single Spark context is being shared across many
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15830530#comment-15830530 ] Imran Rashid commented on SPARK-18886: -- I had another idea for how to fix this. In addition to tracking the last time any task was launched, TaskScheduler also tracks the last time it didn't schedule anything due to locality constraints, on *each resource*. Then when a new offer comes in, you are allowed to schedule if either the overall locality timer is up, or if the timer is up for that particular resource. On the plus side -- I think this keeps all the properties we want. You avoid an indefinite delay just because *one* resource is local; but you also keep the delay if those resources get used up by another task set. The downside -- significantly more complex. It adds to the memory usage of TaskScheduler (though in the scheme of things, pretty nominal increase), but it will also make the code significantly more complicated. Aside: There is also weird relationship between taskset priority, and locality scheduling. Assuming all tasksets have cleared their locality wait timeouts, then we favor taskset priority over locality. But if the tasksets haven't cleared those timeouts, then things get strange. It really depends on what the current locality levels are in each taskset. In the simple case, you end up favoring locality, by limiting the max Locality of each taskset. A very low priority taskset easily "steals" the resources from a high priority one if it doesn't have locality preferences. We should probably figure out what the desired behavior is so we can make it a little more consistent (or at least document it). > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15765322#comment-15765322 ] Imran Rashid commented on SPARK-18886: -- You understand correctly -- that is precisely what I'm proposing. The scenario with multiple waves is a good example for why I think this is a *good* change. If only 1% of your cluster can take advantage of locality, then 99% of your cluster goes unused across all those waves. That may be an extreme (though a case I have actually seen in practice on large clusters). even if its 50%, then you have 50% of your cluster going unused. Unless local tasks are more than 2x faster, it would make more sense to make the change I'm proposing. What's the worst case after this change? All but one executor are local -- the result is that you have one task running slower. But the more waves there are, the less the downside. Eg., you complete 10 waves on the local executors, and only 8 waves on the non-local one. The worst case is if there is only one wave, there is a huge gap (multiples Xs) in runtime between local and non-local execution, and moments after you schedule on non-local resources, some local resource would become available. I think this situation is not very common -- in particular, there normally isn't *such* an enormous gap between local and non-local that users would prefer their non-local resources sit idle indefinitely. I'd argue that if such a use case is important, we should add a special conf for that in particular. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764876#comment-15764876 ] Mridul Muralidharan commented on SPARK-18886: - I am not sure what is described will work as expected [~imranr]. Consider a taskset which has number of tasks as many multiples of number of executors (fairly common scenario). In this case, if the timer is never reset, you will effectively make delay to 0 once it expires, across all waves for the taskset. (I am assuming I understood the proposal right). [~kayousterhout] and [~markhamstra] might have more comments though - in case I am missing something here. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764795#comment-15764795 ] Apache Spark commented on SPARK-18886: -- User 'squito' has created a pull request for this issue: https://github.com/apache/spark/pull/16354 > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762296#comment-15762296 ] Imran Rashid commented on SPARK-18886: -- Thanks [~mridul], that helps -- in particular I was only thinking about bulk scheduling, I had forgotten to take that into account. After a closer look through the code, I think my earlier proposal makes sense -- rather than resetting the timeout as each task is scheduled, change it to start the timer as soon as there is an offer which goes unused due to the delay. Once that timer is started, it is never reset (for that TSM). I can think of one scenario where this would result in worse scheduling than what we currently have. Suppose that initially, a TSM is offered one resource which only matches on rack_local. But immediately after that, many process_local offers are made, which are all used up. Some time later, more offers that are only rack_local come in. They'll immediately get used, even though there may be plenty more offers that are process_local that are just about to come in (perhaps enough for all of the remaining tasks). That wouldn't be great, but its also not nearly as bad as letting most of your cluster sit idle. Other alternatives I can think of: a) Turn off delay scheduling by default, and change [{{TaskSchedulerImpl.resourceOffer}}|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L357-L360] to go through all task sets, then advance locality levels, rather than the other way around. Perhaps we should invert those loops anyway, just for when users turn off delay scheduling. b) Have TSM use some knowledge about all available executors to decide whether or not it is even possible for enough resources at the right locality level to appear. Eg., in the original case, the TSM would realize there is only one executor which is process_local, so it doesn't make sense to wait to schedule all tasks on that executor. However, I'm pretty skeptical about doing anything like this, as it may be a somewhat complicated thing inside the scheduler, and it could just turn into a mess of heuristics which has lots of corner cases. I think implementing my proposed solution should be relatively easy, so I'll take a stab at it, but I'd still appreciate more input on the right approach here. Perhaps seeing an implementation will make it easier to discuss. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15756179#comment-15756179 ] Mridul Muralidharan commented on SPARK-18886: - - Delay 'using up' all resources - another task/taskset which has better locality preference might be available for that executor (also, see speculative exec impact) - A delay would cause a better locality preference to become available for the task. Suboptimal schedule has a cascading effect on rest of the executors, application and cluster. - Note that not all executors are available at the same time in resourceOffer : you have bulk reschedule periodically, sporadic reschedules when tasks finish and periodic bulk speculative schedule updates. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752722#comment-15752722 ] Imran Rashid commented on SPARK-18886: -- [~mridul] sorry if I am being slow here, but do you mind spelling out for me in more detail? I'm *not* asking about the benefits of using locality preferences -- I get that part. I'm asking about why the *delay*. There has to be something happening during the delay which we want to wait for. One possibility is that you've got multiple tasksets running concurrently, with different locality preferences. You wouldn't want the first taskset to use all the resources, you'd rather take both tasksets into account. This is accomplished with delay scheduling, but you don't actually *need* the delay. Another possibility is that there is such a huge gap in runtime that you expect your preferred locations will finish *all* tasks in the taskset before that delay is up, by having some executors run multiple tasks. The reason I'm trying to figure this out is to figure out if there is a sensible fix here (and what the smallest possible fix would be). If this is it, then the fix I suggested above to Mark should handle this case, while still working as intended in other cases. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752579#comment-15752579 ] Mridul Muralidharan commented on SPARK-18886: - [~imranr] For almost all cases, delay scheduling dramatically increases performance. The difference even between PROCESS and NODE is significantly high (between NODE and 'lower' levels, it can depend on your network config). For both tasks with short duration and tasks processing large amounts of data, it has non trivial impact : long tasks processing small data, it is not so useful in comparison iirc, same for degenerate cases where locality preference is suboptimal to begin with. [As an aside, the ability to not specify PROCESS level locality actually is a drawback in our api] The job(s) I mentioned where we set it to 0 were special cases, where we knew the costs well enough to make the decision to lower it : but I would not recommend it unless users are very sure of what they are doing. While analysing the cost, it should also be kept in mind that transferring data across nodes impacts not just spark job, but every other job in the cluster. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752462#comment-15752462 ] Imran Rashid commented on SPARK-18886: -- [~mridulm80] good point, perhaps the right answer here is just to turn off delay scheduling completely -- not setting {{"spark.locality.wait.process"}} to a small value, as I had suggested in the initial workaround, but just turning it off completely, to avoid having to futz with tuning that value relative to task runtime. But lemme ask you more or less the same question I just asked mark, phrased a little differently -- given the fragility of this, wouldn't it make more sense for us to turn delay scheduling *off* by default? > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752451#comment-15752451 ] Imran Rashid commented on SPARK-18886: -- [~markhamstra] I'm not really sure yet, I can't really decide exactly what the right behavior should be. My initial thought was that the TSM should track the last time that anything has been scheduled at *each* locality level, rather than just one overall {{lastLaunchTime}}, so that it realizes that some resources have been waiting a long time at a higher locality level. But I wasn't exactly sure what should happen after you schedule one task at a higher locality level. Do you reset the timer? Or do you just keep scheduling at the locality level for the rest of the task set? If you reset the timer, than you will only schedule one task on the other resources, before adding another locality delay, so that doesn't work. But if you keep scheduling at that new locality level, then you've thrown away delay scheduling for the rest of the task set. Is that OK? To put that last question a different way -- what is the point of delay scheduling anyway? What are we hoping will happen in that delay window? 1) the tasks run so fast that the preferred localities make it through all of the tasks in the entire task set before the delay is up 2) other tasksets are concurrently submitted with different locality preferences, so we can submit those tasks to the remaining executors (rather than sitting idle) 3) new resources will be spun up with the desired locality preferences. (Eg., we've requested a bunch of resources from dynamic allocation, and the resources which have become available so far don't have the preferred localities, but more are still getting spun up.) Under all the scenarios I can think of, you might as well turn off delay scheduling for the rest of the taskset. But to be honest I don't feel like I've got good justification for delay scheduling in the first place, so I feel like I may be missing something. Also I did a bit more digging into the case I had, and its happening because of a repartition from a small number (~10) of partitions to a much larger one. The shuffle map stage ends up running two tasks on one host, and with a very small amount of skew, it turns out that one host has > 20% of the shuffle output, while none of the other hosts do. I feel like there is also something else we can do here to improve the shuffle locality preferences, but I don't have any concrete ideas on what that improvement should be. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752426#comment-15752426 ] Mridul Muralidharan commented on SPARK-18886: - Spark scheduler can be suboptimal for a lot of degenerate cases, not just this. But for the most part it does quite well : and for others, you can simply change the config to better suit the workload better - for example, I have used 0 for the delay timeout in some production jobs actually - you get prioritized scheduling, and fallback to ANY quickly when all tasks with preference have been scheduled to remove any waiting : when benefits of colocation are trumped by needing quicker schedules. That is not to say we cant improve scheduling in spark - for example, treat it as a bin packing problem for schedules of a single (or across) locality preference to 'pack' more tasks - doing it without incurring high cost is why lot of these did not make it in unfortunately. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752098#comment-15752098 ] Mark Hamstra commented on SPARK-18886: -- That's a great explanation of the issue, and nice example code, [~imranr]. I'm sure that I have seen this kind of excessive Task stickiness with many quick-to-execute Tasks, but I never got to the level of diagnosing the problem that you have. Your listed workarounds, while interesting, aren't a complete long-term solution, of course. Have you thought at all yet about possible paths to a solution? One idea that comes to my mind is that speculative execution has at least the potential to get the delayed Tasks executed more quickly elsewhere -- but our prior concerns or lack of confidence with speculative execution remain. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15751956#comment-15751956 ] Imran Rashid commented on SPARK-18886: -- Here's a failing test case: (you can check it out directly here: https://github.com/squito/spark/tree/delay_sched-SPARK-18886) {code} test("Delay scheduling checks utilization at each locality level") { // Create a cluster with 100 executors, and submit 100 tasks, but each task would prefer to // be on the same node in the cluster. We should not wait to schedule each task on the one // executor. val conf = new SparkConf().set("spark.locality.wait", "1s") sc = new SparkContext("local", "test", conf) val execs = Seq(("exec0", "host0")) ++ (1 to 100).map { x => (s"exec$x", s"host$x") } val sched = new FakeTaskScheduler(sc, execs: _*) val tasks = FakeTask.createTaskSet(500, (1 to 500).map { _ => Seq(TaskLocation(TaskLocation.executorLocationTag + "host0_exec0"))}: _*) val clock = new ManualClock val manager = new TaskSetManager(sched, tasks, MAX_TASK_FAILURES, clock) logInfo("initial locality levels = " + manager.myLocalityLevels.mkString(",")) assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) // initially, the locality preferences should lead us to only schedule tasks on one executor logInfo(s"trying to schedule first task at ${clock.getTimeMillis()}") val firstScheduledTask = execs.flatMap { case (exec, host) => val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY) assert(schedTaskOpt.isDefined === (exec == "exec0")) schedTaskOpt }.head // without advancing the clock, no matter how many times we make offers on the *other* // executors, nothing should get scheduled (0 until 50).foreach { _ => execs.foreach { case (exec, host) => if (exec != "exec0") { assert(manager.resourceOffer(execId = exec, host = host, ANY).isEmpty) } } } // now we advance the clock till just *before* the locality delay is up, and we finish the first // task val processWait = sc.getConf.getTimeAsMs("spark.locality.wait.process", "3s") val nodeWait = sc.getConf.getTimeAsMs("spark.locality.wait.node", "3s") clock.advance(processWait + nodeWait - 1) logInfo(s"finishing first task at ${clock.getTimeMillis()}") manager.handleSuccessfulTask(firstScheduledTask.taskId, createTaskResult(firstScheduledTask.index)) // if we offer all the resources again, still we should only schedule on one executor logInfo(s"trying to schedule second task at ${clock.getTimeMillis()}") val secondScheduledTask = execs.flatMap { case (exec, host) => val schedTaskOpt = manager.resourceOffer(execId = exec, host = host, ANY) assert(schedTaskOpt.isDefined === (exec == "exec0")) schedTaskOpt }.head // Now lets advance the clock further, so that all of our other executors have been sitting // idle for longer than the locality wait time. We have managed to schedule *something* at a // lower locality level within the time, but regardless, we *should* still schedule on the all // the other resources by this point clock.advance(2000) // this would pass if we advanced the clock by this much instead //clock.advance(processWait + nodeWait + 10) logInfo(s"trying to schedule everyting at ${clock.getTimeMillis()}") execs.foreach { case (exec, host) => if (exec != "exec0") { withClue(s"trying to schedule on $exec:$host at time ${clock.getTimeMillis()}") { assert(manager.resourceOffer(execId = exec, host = host, ANY).isDefined) } } } } {code} > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15751949#comment-15751949 ] Imran Rashid commented on SPARK-18886: -- cc [~kayousterhout] [~markhamstra] [~zsxwing] > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org