[ https://issues.apache.org/jira/browse/SPARK-18689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Travis Hegner updated SPARK-18689: ---------------------------------- Description: I'd like to propose a new mode of operation for the app scheduling system in a stand alone cluster. This idea would eliminate static allocation of cpu cores between apps, and would instead divvy up cpu resources based on the linux cgroups cpu.shares attribute. This could later be expanded to expose more of cgroup's options (memory, cpuset, etc...) to each application running on a stand alone cluster. Preliminary examination shows that this could be relatively simple to implement, and would open up a world of flexibility for how cpu time is allocated and utilized throughout a cluster. What I'm thinking is that when a cluster is operating in "priority" mode, a newly submitted application will be given a unique executor on each worker, optionally limited by {{spark.executor.instances}}. The executor would *not* be *statically* allocated any number of cores, but would have the ability to run the number of tasks equal to the number of cores on that host (obeying `spark.task.cpus`, of course). We could also use {{spark.executor.cores}} to limit the number of tasks that the executors can run simultaneously, if desired. When each worker gets a request to create its executor, it first creates a cgroup with the unique app id (shell examples to show how simple utilizing cgroups can be): {code} mkdir /sys/fs/cgroup/cpu/spark-worker/$appId {code} Then moves the executor into that cgroup: {code} echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks {code} And, finally, sets the number cpu.shares {code} echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares {code} to the amount specified by an application config {{spark.priority.cpushares}}, or something similar. This app would consume 100% of the cpu time allocated to the spark-worker. A second app could come along, and also be allocated an executor on every worker (assuming enough memory), but with double the {{cpu.shares}} priority. Both apps would then run side by side, with the higher priority app receiving 66% of the total CPU time that has been allocated to the spark-worker, and the first app getting the rest. This approach would allow long running, cpu intensive tasks to consume all available cpu until a higher priority app comes along, and it will gracefully allow that app to consume whatever it needs, without any type of scheduling or preemption logic needing to be written. Naturally, there would have to be some configuration options to set sane defaults and limits for the number of shares that users can request for an app. One minor downfall is that currently the {{java.lang.Process}} object used to launch the executor does not have a public method to access its {{pid}}, which is required to put that executor into a cgroup. Finding the pid of the executor currently requires using the reflection api, but is only a handful of lines, from what I've found. (Java 9 has a new {{java.lang.ProcessHandle}} interface, which does provide a way to get the {{pid}} of the process it represents, but I realize that is a long way out.) The other approaches for utilizing cgroups that I've studied for Mesos and Yarn abstract away {{cpu.shares}} into some multiple of the number of requested "cores". I understand this approach for convenience, but it still does static allocation of cores, and removes much of the flexibility that cgroups provide. In my opinion, running in this mode, as opposed to FIFO, gives users a more intuitive experience out of the box when attempting to run multiple simultaneous apps, without even having to think about any resource allocation. was: I'd like to propose a new mode of operation for the app scheduling system in a stand alone cluster. This idea would eliminate static allocation of cpu cores between apps, and would instead divvy up cpu resources based on the linux cgroups cpu.shares attribute. This could later be expanded to expose more of cgroup's options (memory, cpuset, etc...) to each application running on a stand alone cluster. Preliminary examination shows that this could be relatively simple to implement, and would open up a world of flexibility for how cpu time is allocated and utilized throughout a cluster. What I'm thinking is that when a cluster is operating in "priority" mode, a newly submitted application will be given a unique executor on each worker, optionally limited by `spark.executor.instances`. The executor would *not* be *statically* allocated any number of cores, but would have the ability to run the number of tasks equal to the number of cores on that host (obeying `spark.task.cpus`, of course). We could also use `spark.executor.cores` to limit the number of tasks that the executors can run simultaneously, if desired. When each worker gets a request to create its executor, it first creates a cgroup with the unique app id (shell examples to show how simple utilizing cgroups can be): {code} mkdir /sys/fs/cgroup/cpu/spark-worker/$appId {code} Then moves the executor into that cgroup: {code} echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks {code} And, finally, sets the number cpu.shares {code} echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares {code} to the amount specified by an application config `spark.priority.cpushares`, or something similar. This app would consume 100% of the cpu time allocated to the spark-worker. A second app could come along, and also be allocated an executor on every worker (assuming enough memory), but with double the `cpu.shares` priority. Both apps would then run side by side, with the higher priority app receiving 66% of the total CPU time that has been allocated to the spark-worker, and the first app getting the rest. This approach would allow long running, cpu intensive tasks to consume all available cpu until a higher priority app comes along, and it will gracefully allow that app to consume whatever it needs, without any type of scheduling or preemption logic needing to be written. Naturally, there would have to be some configuration options to set sane defaults and limits for the number of shares that users can request for an app. One minor downfall is that currently the java.lang.Process object used to launch the executor does not have a public method to access its pid, which is required to put that executor into a cgroup. Finding the pid of the executor currently requires using the reflection api, but is only a handful of lines, from what I've found. (Java 9 has a new `ProcessHandle` interface, which does provide a way to get the pid of the process it represents, but I realize that is a long way out.) The approaches for utilizing cgroups that I've studied for Mesos and Yarn abstract away cpu.shares into some multiple of the number of requested "cores". I understand this approach for convenience, but it still does static allocation of cores, and removes the flexibility that cgroups provide. In my opinion, running in this mode, as opposed to FIFO, gives users a more intuitive experience out of the box when attempting to run multiple simultaneous apps, without even having to think about any resource allocation. > Support prioritized apps utilizing linux cgroups > ------------------------------------------------ > > Key: SPARK-18689 > URL: https://issues.apache.org/jira/browse/SPARK-18689 > Project: Spark > Issue Type: New Feature > Reporter: Travis Hegner > > I'd like to propose a new mode of operation for the app scheduling system in > a stand alone cluster. This idea would eliminate static allocation of cpu > cores between apps, and would instead divvy up cpu resources based on the > linux cgroups cpu.shares attribute. This could later be expanded to expose > more of cgroup's options (memory, cpuset, etc...) to each application running > on a stand alone cluster. > Preliminary examination shows that this could be relatively simple to > implement, and would open up a world of flexibility for how cpu time is > allocated and utilized throughout a cluster. > What I'm thinking is that when a cluster is operating in "priority" mode, a > newly submitted application will be given a unique executor on each worker, > optionally limited by {{spark.executor.instances}}. The executor would *not* > be *statically* allocated any number of cores, but would have the ability to > run the number of tasks equal to the number of cores on that host (obeying > `spark.task.cpus`, of course). We could also use {{spark.executor.cores}} to > limit the number of tasks that the executors can run simultaneously, if > desired. > When each worker gets a request to create its executor, it first creates a > cgroup with the unique app id (shell examples to show how simple utilizing > cgroups can be): > {code} > mkdir /sys/fs/cgroup/cpu/spark-worker/$appId > {code} > Then moves the executor into that cgroup: > {code} > echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks > {code} > And, finally, sets the number cpu.shares > {code} > echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares > {code} > to the amount specified by an application config > {{spark.priority.cpushares}}, or something similar. This app would consume > 100% of the cpu time allocated to the spark-worker. > A second app could come along, and also be allocated an executor on every > worker (assuming enough memory), but with double the {{cpu.shares}} priority. > Both apps would then run side by side, with the higher priority app receiving > 66% of the total CPU time that has been allocated to the spark-worker, and > the first app getting the rest. > This approach would allow long running, cpu intensive tasks to consume all > available cpu until a higher priority app comes along, and it will gracefully > allow that app to consume whatever it needs, without any type of scheduling > or preemption logic needing to be written. > Naturally, there would have to be some configuration options to set sane > defaults and limits for the number of shares that users can request for an > app. > One minor downfall is that currently the {{java.lang.Process}} object used to > launch the executor does not have a public method to access its {{pid}}, > which is required to put that executor into a cgroup. Finding the pid of the > executor currently requires using the reflection api, but is only a handful > of lines, from what I've found. (Java 9 has a new {{java.lang.ProcessHandle}} > interface, which does provide a way to get the {{pid}} of the process it > represents, but I realize that is a long way out.) > The other approaches for utilizing cgroups that I've studied for Mesos and > Yarn abstract away {{cpu.shares}} into some multiple of the number of > requested "cores". I understand this approach for convenience, but it still > does static allocation of cores, and removes much of the flexibility that > cgroups provide. > In my opinion, running in this mode, as opposed to FIFO, gives users a more > intuitive experience out of the box when attempting to run multiple > simultaneous apps, without even having to think about any resource allocation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org