[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987633#comment-16987633 ]
Nicholas Brett Marcott commented on SPARK-18886: ------------------------------------------------ Thanks for mentioning the PRs here. My proposed solution in the second [PR mentioned above|https://github.com/apache/spark/pull/26696] is what I believe Kay said was ideal in comments of this [PR|https://github.com/apache/spark/pull/9433], but seemed to think was impractical. *The proposed solution:* Currently the time window that locality wait times are measuring is the time since the last task launched for a TSM. The proposed change is to instead measure the time since this TSM's available slots were fully utilized. The number of available slots for a TSM can be determined by dividing all slots among the TSMs according to the scheduling policy (FIFO vs FAIR). *Other possible solutions and their issues:* # Never reset timer: delay scheduling would likely only work on first wave* # Per slot timer: delay scheduling should apply per task/taskset, otherwise, timers started by one taskset could cause delay scheduling to be ignored for the next taskset, which might lead you to try approach #3 # Per slot per stage timer: tasks can be starved by being offered unique slots over a period of time. Possibly a taskset or other job that doesn't care about locality would use those resources. Also too many timers/bookkeeping # Per task timer: you still need a way to distinguish between when a task is waiting for a slot to become available vs it has them available but is not utilizing them (which is what this PR does). To do this right seems to be this PR + more timers. *wave = one round of running as many tasks as there are available slots for a taskset. imagine you have 2 slots and 10 tasks. it would take 10 / 2 = 5 waves to complete the taskset > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 2.1.0 > Reporter: Imran Rashid > Priority: Major > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.wait.process" to your desired wait interval, and > set both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". > For example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". *NOTE*: due to SPARK-18967, avoid > setting the {{spark.locality.wait=0}} -- instead, use > {{spark.locality.wait=1ms}}. > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. > (2a) A shuffle may end up with very skewed locality preferences, especially > if you do a repartition starting from a small number of partitions. (Shuffle > locality preference is assigned if any node has more than 20% of the shuffle > input data -- by chance, you may have one node just above that threshold, and > all other nodes just below it.) In this case, you can turn off locality > preference for shuffle data by setting > {{spark.shuffle.reduceLocality.enabled=false}} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org