[ 
https://issues.apache.org/jira/browse/SPARK-18689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Hegner updated SPARK-18689:
----------------------------------
    Description: 
I'd like to propose a new mode of operation for the app scheduling system in a 
stand alone cluster. This idea would eliminate static allocation of cpu cores 
between apps, and would instead divvy up cpu resources based on the linux 
cgroups cpu.shares attribute. This could later be expanded to expose more of 
cgroup's options (memory, cpuset, etc...) to each application running on a 
stand alone cluster.

Preliminary examination shows that this could be relatively simple to 
implement, and would open up a world of flexibility for how cpu time is 
allocated and utilized throughout a cluster.

What I'm thinking is that when a cluster is operating in "priority" mode, a 
newly submitted application will be given a unique executor on each worker, 
optionally limited by {{spark.executor.instances}}. The executor would *not* be 
*statically* allocated any number of cores, but would have the ability to run 
the number of tasks equal to the number of cores on that host (obeying 
`spark.task.cpus`, of course). We could also use {{spark.executor.cores}} to 
limit the number of tasks that the executors can run simultaneously, if desired.

When each worker gets a request to create its executor, it first creates a 
cgroup with the unique app id (shell examples to show how simple utilizing 
cgroups can be):

{code}
mkdir /sys/fs/cgroup/cpu/spark-worker/$appId
{code}

Then moves the executor into that cgroup:

{code}
echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks
{code}

And, finally, sets the number cpu.shares

{code}
echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares
{code}

to the amount specified by an application config {{spark.priority.cpushares}}, 
or something similar. This app would consume 100% of the cpu time allocated to 
the spark-worker.

A second app could come along, and also be allocated an executor on every 
worker (assuming enough memory), but with double the {{cpu.shares}} priority. 
Both apps would then run side by side, with the higher priority app receiving 
66% of the total CPU time that has been allocated to the spark-worker, and the 
first app getting the rest.

This approach would allow long running, cpu intensive tasks to consume all 
available cpu until a higher priority app comes along, and it will gracefully 
allow that app to consume whatever it needs, without any type of scheduling or 
preemption logic needing to be written.

Naturally, there would have to be some configuration options to set sane 
defaults and limits for the number of shares that users can request for an app.

One minor downfall is that currently the {{java.lang.Process}} object used to 
launch the executor does not have a public method to access its {{pid}}, which 
is required to put that executor into a cgroup. Finding the pid of the executor 
currently requires using the reflection api, but is only a handful of lines, 
from what I've found. (Java 9 has a new {{java.lang.ProcessHandle}} interface, 
which does provide a way to get the {{pid}} of the process it represents, but I 
realize that is a long way out.)

The other approaches for utilizing cgroups that I've studied for Mesos and Yarn 
abstract away {{cpu.shares}} into some multiple of the number of requested 
"cores". I understand this approach for convenience, but it still does static 
allocation of cores, and removes much of the flexibility that cgroups provide.

In my opinion, running in this mode, as opposed to FIFO, gives users a more 
intuitive experience out of the box when attempting to run multiple 
simultaneous apps, without even having to think about any resource allocation.

  was:
I'd like to propose a new mode of operation for the app scheduling system in a 
stand alone cluster. This idea would eliminate static allocation of cpu cores 
between apps, and would instead divvy up cpu resources based on the linux 
cgroups cpu.shares attribute. This could later be expanded to expose more of 
cgroup's options (memory, cpuset, etc...) to each application running on a 
stand alone cluster.

Preliminary examination shows that this could be relatively simple to 
implement, and would open up a world of flexibility for how cpu time is 
allocated and utilized throughout a cluster.

What I'm thinking is that when a cluster is operating in "priority" mode, a 
newly submitted application will be given a unique executor on each worker, 
optionally limited by `spark.executor.instances`. The executor would *not* be 
*statically* allocated any number of cores, but would have the ability to run 
the number of tasks equal to the number of cores on that host (obeying 
`spark.task.cpus`, of course). We could also use `spark.executor.cores` to 
limit the number of tasks that the executors can run simultaneously, if desired.

When each worker gets a request to create its executor, it first creates a 
cgroup with the unique app id (shell examples to show how simple utilizing 
cgroups can be):

{code}
mkdir /sys/fs/cgroup/cpu/spark-worker/$appId
{code}

Then moves the executor into that cgroup:

{code}
echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks
{code}

And, finally, sets the number cpu.shares

{code}
echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares
{code}

to the amount specified by an application config `spark.priority.cpushares`, or 
something similar. This app would consume 100% of the cpu time allocated to the 
spark-worker.

A second app could come along, and also be allocated an executor on every 
worker (assuming enough memory), but with double the `cpu.shares` priority. 
Both apps would then run side by side, with the higher priority app receiving 
66% of the total CPU time that has been allocated to the spark-worker, and the 
first app getting the rest.

This approach would allow long running, cpu intensive tasks to consume all 
available cpu until a higher priority app comes along, and it will gracefully 
allow that app to consume whatever it needs, without any type of scheduling or 
preemption logic needing to be written.

Naturally, there would have to be some configuration options to set sane 
defaults and limits for the number of shares that users can request for an app.

One minor downfall is that currently the java.lang.Process object used to 
launch the executor does not have a public method to access its pid, which is 
required to put that executor into a cgroup. Finding the pid of the executor 
currently requires using the reflection api, but is only a handful of lines, 
from what I've found. (Java 9 has a new `ProcessHandle` interface, which does 
provide a way to get the pid of the process it represents, but I realize that 
is a long way out.)

The approaches for utilizing cgroups that I've studied for Mesos and Yarn 
abstract away cpu.shares into some multiple of the number of requested "cores". 
I understand this approach for convenience, but it still does static allocation 
of cores, and removes the flexibility that cgroups provide.

In my opinion, running in this mode, as opposed to FIFO, gives users a more 
intuitive experience out of the box when attempting to run multiple 
simultaneous apps, without even having to think about any resource allocation.


> Support prioritized apps utilizing linux cgroups
> ------------------------------------------------
>
>                 Key: SPARK-18689
>                 URL: https://issues.apache.org/jira/browse/SPARK-18689
>             Project: Spark
>          Issue Type: New Feature
>            Reporter: Travis Hegner
>
> I'd like to propose a new mode of operation for the app scheduling system in 
> a stand alone cluster. This idea would eliminate static allocation of cpu 
> cores between apps, and would instead divvy up cpu resources based on the 
> linux cgroups cpu.shares attribute. This could later be expanded to expose 
> more of cgroup's options (memory, cpuset, etc...) to each application running 
> on a stand alone cluster.
> Preliminary examination shows that this could be relatively simple to 
> implement, and would open up a world of flexibility for how cpu time is 
> allocated and utilized throughout a cluster.
> What I'm thinking is that when a cluster is operating in "priority" mode, a 
> newly submitted application will be given a unique executor on each worker, 
> optionally limited by {{spark.executor.instances}}. The executor would *not* 
> be *statically* allocated any number of cores, but would have the ability to 
> run the number of tasks equal to the number of cores on that host (obeying 
> `spark.task.cpus`, of course). We could also use {{spark.executor.cores}} to 
> limit the number of tasks that the executors can run simultaneously, if 
> desired.
> When each worker gets a request to create its executor, it first creates a 
> cgroup with the unique app id (shell examples to show how simple utilizing 
> cgroups can be):
> {code}
> mkdir /sys/fs/cgroup/cpu/spark-worker/$appId
> {code}
> Then moves the executor into that cgroup:
> {code}
> echo $pid > /sys/fs/cgroup/cpu/spark-worker/$appId/tasks
> {code}
> And, finally, sets the number cpu.shares
> {code}
> echo $cpushares > /sys/fs/cgroup/cpu/spark-worker/$appId/cpu.shares
> {code}
> to the amount specified by an application config 
> {{spark.priority.cpushares}}, or something similar. This app would consume 
> 100% of the cpu time allocated to the spark-worker.
> A second app could come along, and also be allocated an executor on every 
> worker (assuming enough memory), but with double the {{cpu.shares}} priority. 
> Both apps would then run side by side, with the higher priority app receiving 
> 66% of the total CPU time that has been allocated to the spark-worker, and 
> the first app getting the rest.
> This approach would allow long running, cpu intensive tasks to consume all 
> available cpu until a higher priority app comes along, and it will gracefully 
> allow that app to consume whatever it needs, without any type of scheduling 
> or preemption logic needing to be written.
> Naturally, there would have to be some configuration options to set sane 
> defaults and limits for the number of shares that users can request for an 
> app.
> One minor downfall is that currently the {{java.lang.Process}} object used to 
> launch the executor does not have a public method to access its {{pid}}, 
> which is required to put that executor into a cgroup. Finding the pid of the 
> executor currently requires using the reflection api, but is only a handful 
> of lines, from what I've found. (Java 9 has a new {{java.lang.ProcessHandle}} 
> interface, which does provide a way to get the {{pid}} of the process it 
> represents, but I realize that is a long way out.)
> The other approaches for utilizing cgroups that I've studied for Mesos and 
> Yarn abstract away {{cpu.shares}} into some multiple of the number of 
> requested "cores". I understand this approach for convenience, but it still 
> does static allocation of cores, and removes much of the flexibility that 
> cgroups provide.
> In my opinion, running in this mode, as opposed to FIFO, gives users a more 
> intuitive experience out of the box when attempting to run multiple 
> simultaneous apps, without even having to think about any resource allocation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to