[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356887#comment-16356887 ] Julien Cuquemelle edited comment on SPARK-22683 at 2/8/18 12:38 PM: Thanks a lot for your feedbacks, some clarification: - the default remains 1, it means resource is not an issue and you want to minimize latency. - if you want to be resource-aware, use 2 which brings the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize resource saving you need to measure resource consumption with higher values of the parameter. regarding the multi-job use case, I do agree that this will not be optimal (but it will not be less optimal than today's state), and that we need a per-job configuration. We have a use-case where we had to split an application with 2 jobs into 2 applications, because the optimal tuning of the executors was very different. A per-job config seems actually interesting, but it is not limited to this new parameter, but should allow tuning the whole config. If that is ever implemented, the question of resource saving with the dynamic allocation still remains for each job, so this new parameter will still be useful. Regarding the possibility to set the parameter programmatically, right now the number of tasks per executors is computed during the starting of the AllocationManager, which happens during the SparkContext initialization, so it is not possible to update it. But it does not seem difficult to make it mutable so that the number of needed executors computation takes it into account: {code:java} /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } {code} was (Author: jcuquemelle): Thanks a lot for your feedbacks, some clarification: - the default remains 1, it means resource is not an issue and you want to minimize latency. - if you want to be resource-aware, use 2 which brings the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize resource saving you need to measure resource consumption with higher values of the parameter. regarding the multi-job use case, I do agree that this will not be optimal (but it will not be less optimal than today's state), and that we need a per-job configuration. We have a use-case where we had to split an application with 2 jobs into 2 applications, because the optimal tuning of the executors was very different. But if that happens, the question of resource saving with the dynamic allocation still remains for each job, so this new parameter will still be useful. Regarding the possibility to set it programmatically, right now the number of tasks per executors is computed during the starting of the AllocationManager, which happens during the SparkContext initialization, so it is not possible to update it. But it does not seem difficult to make it mutable so that the number of needed executors computation takes it into account: {code:java} /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } {code} > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356168#comment-16356168 ] Thomas Graves edited comment on SPARK-22683 at 2/7/18 10:24 PM: I agree, I think default behavior stays 1. I ran a few tests with this patch. I definitely see an improvement in resource usage across all the jobs I ran. The jobs were similar job run time or actually faster on a few. I used default 60 second timeout. Note none of those jobs were really long running. small to medium size tasks. was (Author: tgraves): I agree, I think default behavior stays 1. I ran a few tests with this patch. I definitely see an improvement in resource usage across all the jobs I ran. The jobs were similar job run time or actually faster on a few. I used default 60 second timeout. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16301709#comment-16301709 ] Felix Cheung edited comment on SPARK-22683 at 12/22/17 5:35 PM: I couldn't find the exact source line, but from running Flink previously I'm reasonably sure number of task slots == number of cores. Therefore I don't think it's meant to increase utilization by over committing tasks or concurrently running multiple tasks and so on. was (Author: felixcheung): I couldn't find the exact source line, but from running Flink previously I'm reasonably sure number of task slots == number of cores. Therefore I don't think it's meant to increase utilization by concurrently running multiple tasks and so on. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM: - [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at meeting a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes." Was this with this patch applied or without?" The patch was applied, if not you cannot set the number of tasks per taskSlot (I mentionned "executor slot", which is incorrect, I was refering to taskSlot) "the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then MR? " Positive numbers mean faster in Spark. "why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, or just unknown overhead?" running with 6 tasks per taskSlot means that 6 tasks will be processed sequentially by 6 times less task slots was (Author: jcuquemelle): [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM: - [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes." Was this with this patch applied or without?" The patch was applied, if not you cannot set the number of tasks per taskSlot (I mentionned "executor slot", which is incorrect, I was refering to taskSlot) "the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then MR? " Positive numbers mean faster in Spark. "why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, or just unknown overhead?" running with 6 tasks per taskSlot means that 6 tasks will be processed sequentially by 6 times less task slots was (Author: jcuquemelle): [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, do an analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 3:25 PM: - I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 similar jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout ||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout|| |693.571429|37.142857|6|1.0| |584.857143|69.571429|1|30.0| |763.428571|54.285714|1|60.0| |826.714286|39.571429|1|90.0| So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. was (Author: jcuquemelle): I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout ||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout|| |693.571429|37.142857|6|1.0| |584.857143|69.571429|1|30.0| |763.428571|54.285714|1|60.0| |826.714286|39.571429|1|90.0| So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. > DynamicAllocation wastes resources by allocating containers that will barely > be used >