>The standalone thing was meant as a simple way to deploy Spark, and we gotta 
>be careful with introducing a lot more features to it because then it becomes 
>just a full fledged cluster manager and is duplicating the work of the other 
>more mature ones.

Understood. There seems to be a very fine line between making "standalone" 
better, or more intuitive, versus just not using it. The only thing I'm after 
personally is the ability to run more than one application at once, on the same 
spark standalone cluster, sharing all available cores with a predetermined 
level of priority. To me, that is a more intuitive way for it to run. This 
patch accomplishes that in a little over 100 lines added to the code base. 
Wanting to leave it at that point without any further expansion to avoid 
duplicate work is understandable.

>Have you thought about contributing specific changes to these cluster managers 
>to address the gaps you have seen?

Somewhat. I've tested each of those cluster managers and even contributed to 
Mesos for some docker functionality. The basic foundation in each of the 
designs is through static core allocation, which doesn't work well when you 
want to share workloads on a small cluster, while still giving those workloads 
full access to all resources when they are available. I haven't studied the 
YARN code base closely, so I can't comment on how difficult it would be to 
contribute similar changes there. I don't currently run a YARN cluster, I only 
intend to run spark jobs, and the standalone mode was better for those in my 
tests.

Studying the way spark standalone launches its executors led me to realize that 
there was a shortcut, and a very small amount of code, to accomplish my goal. 
Granted, my goal seems to be a completely different concept in the way cpu time 
is allocated, compared to how any of the cluster managers are doing it. But it 
is the way the kernel manages cpu time within a host. Everything that runs in 
linux is already running in a cgroup, and is allocated cpu time based on it's 
shares. Imagine if your workstation could only one run process at a time, all 
resources reserved for just that process whether it needed them or not.

I'm not trying to turn spark standalone into a cluster manager. I know I keep 
mentioning ways to expand cgroup functionality, but that is because I see a lot 
of value in allowing the kernel to manage the resources there. That is not my 
short term goal, however. My short term goal is allowing multiple simultaneous 
spark applications to run on the same cluster. Does anyone see value in just 
that short term goal?

Thanks,

Travis

________________________________
From: Reynold Xin <r...@databricks.com>
Sent: Thursday, December 15, 2016 14:07
To: Hegner, Travis
Cc: Jörn Franke; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling 
utilizing linux cgroups.

In general this falls directly into the domain of external cluster managers 
(YARN, Mesos, Kub). The standalone thing was meant as a simple way to deploy 
Spark, and we gotta be careful with introducing a lot more features to it 
because then it becomes just a full fledged cluster manager and is duplicating 
the work of the other more mature ones.

Have you thought about contributing specific changes to these cluster managers 
to address the gaps you have seen?



On Thu, Dec 15, 2016 at 10:38 AM, Hegner, Travis 
<theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote:

Thanks for the response Jörn,

This patch is intended only for spark standalone.

My understanding of the YARN cgroup support is that it only limits cpu, rather 
than allocates it based on the priority or shares system. This could be old 
documentation that I'm remembering, however. Another issue with YARN is that it 
has a lot more overhead than standalone mode, and always seemed a bit less 
responsive in general. Lastly, I remember struggling greatly with yet another 
resource abstraction layer (as if spark doesn't have enough already), it still 
statically allocated cores (albeit virtual ones), and it was much more 
cumbersome to find a proper balance of resources to request for an app.

My experience in trying to accomplish something like this in Mesos was always 
met with frustration because the system still statically allocated cores away 
to be reserved by individual apps. Trying to adjust the priority of individual 
applications was only possible by increasing the core count, further starving 
other apps of available cores. It was impossible to give a priority lower than 
the default to an app. The cpu.shares parameter was abstracted away as a 
multiple of the number of requested cores, which had a double down affect on 
the app: not only was it given more cores, it was also given a higher priority 
to run on them. Perhaps this has changed in more recent versions, but this was 
my experience when testing it.

I'm not familiar with a spark scheduler for kubernetes, unless you mean to 
launch a standalone cluster in containers with kubernetes? In that case, this 
patch would simply divvy up the resources allocated to the spark-worker 
container among each of it's executors, based on the shares that each executor 
is given. This is similar to how my current environment works, I'm just not 
using kubernetes as a container launcher. I found kubernetes was quite limiting 
in the way we wanted our network to be structured, and it also seemed quite 
difficult to get new functionality exposed in the form of their yaml API system.

My goal with this patch is to essentially eliminate the static allocation of 
cpu cores at all. Give each app time on the cpu equal to the number of shares 
it has as a percentage of the total pool.

Thanks,

Travis

________________________________
From: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
Sent: Thursday, December 15, 2016 12:48
To: Hegner, Travis
Cc: Apache Spark Dev

Subject: Re: SPARK-18689: A proposal for priority based app scheduling 
utilizing linux cgroups.

Hi,

What about yarn or mesos used in combination with Spark. The have also cgroups. 
Or a kubernetes etc deployment.

On 15 Dec 2016, at 17:37, Hegner, Travis 
<theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote:


Hello Spark Devs,


I have finally completed a mostly working proof of concept. I do not want to 
create a pull request for this code, as I don't believe it's production worthy 
at the moment. My intent is to better communicate what I'd like to accomplish. 
Please review the following patch: 
https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.


What the code does:


Currently, it exposes two options "spark.cgroups.enabled", which defaults to 
false, and "spark.executor.shares" which defaults to None. When cgroups mode is 
enabled, a single executor is created on each worker, with access to all cores. 
The worker will create a parent cpu cgroup (on first executor launch) called 
"spark-worker" to house any executors that it launches. Each executor is put 
into it's own cgroup named with the app id, under the parent cgroup. The 
cpu.shares parameter is set to the value in "spark.executor.shares", if this is 
"None", it inherits the value from the parent cgroup.


Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not 
run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change 
this code base, but it looks like the kernel interface is the same for the most 
part.


I was able to launch a spark shell which consumed all cores in the cluster, but 
sat idle. I was then able to launch an application (client deploy-mode) which 
was also allocated all cores in the cluster, and ran to completion unhindered. 
Each of the executors on each worker was properly placed into it's respective 
cgroup, which in turn had the correct cpu.shares value allocated.


What the code still needs:


* Documentation (assuming the community moves forward with some kind of 
implementation)

* Sometimes the cgroups get destroyed after app completion, sometimes they 
don't. (need to put `.destroy()` call in a `finally` block., or in the 
`maybeCleanupApplication()` method; what do you think?)

* Proper handling of drivers's resources when running `--deploy-mode cluster`

* Better web UI indication of cgroup mode or core sharing (currently just shows 
up as an over allocation of cores per worker)

* Better environment/os/platform detection and testing (I won't be surprised if 
there is something broken if trying to run this on a different OS)

* Security/permissions for cgroups if running worker as non-root (perhaps 
creating the parent cgroup with correct permissions before launching the worker 
is all that is necessary)

  - running the worker in a container currently requires --privileged mode (I 
haven't figured out if/what capability makes cgroups writable, or if it's 
possible to use a new cgroup mount point)

* More user defined options

  - cgroup root path (currently hard coded)

  - driver cpu.shares (for cluster deploy-mode: would require a specially named 
cgroup... s"$appId-driver" ? default same #shares as executor? default double 
shares?

  - parent cpu.shares (currently os default)

  - parent cgroup name (currently hard coded)


I tried to structure the initial concept to make it easy to add support for 
more cgroup features (cpuset, mem, etc...), should the community feel there is 
value in adding them. Linux cgroups are an extremely powerful resource 
allocation and isolation tool, and this patch is only scratching the surface of 
their general capabilities. Of course, as Mr. Loughran's points out, expanding 
into these features will require more code maintenance, but not enough that we 
should shy away from it.


<opinion>

I personally believe that any multi-node resource allocation system should 
offload as much of the scheduling and resource allocation as possible to the 
underlying kernel within the node level. Each node's own kernel is the best 
equipped place to manage those resources. Only the node's kernel can allocate a 
few seconds worth of cpu to the low priority app, while the high priority app 
is waiting on disk I/O, and instantly give it back to the high priority app 
when it needs it, with (near) real-time granularity


The multi-node system should set up a proper framework to give each node's 
kernel the information it needs to allocate the resources correctly. Naturally, 
the system should allow resource reservations, and even limits, for the 
purposes of meeting and testing for SLAs and worst case scenarios as well. 
Linux cgroups are capable of doing those things in a near real-time fashion.


With a proper convention of priorities/shares for applications within an 
organization, I believe that everyone can get better throughput out of their 
hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to 
solve currently.

</opinion>

Sorry that the patch is pretty rough still, as I'm still getting my head 
wrapped around spark's code base structure. Looking forward to any feedback.

Thanks,

Travis

________________________________
From: Hegner, Travis <theg...@trilliumit.com<mailto:theg...@trilliumit.com>>
Sent: Tuesday, December 6, 2016 10:49
To: Steve Loughran
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling 
utilizing linux cgroups.



Steve,

I appreciate your experience and insight when dealing with large clusters at 
the data-center scale. I'm also well aware of the complex nature of schedulers, 
and that it is an area of ongoing research being done by people/companies with 
many more resources than I have. This might explain my apprehension in even 
calling this idea a *scheduler*: I wanted to avoid this exact kind of debate 
over what I want to accomplish. This is also why I mentioned that this idea 
will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and 
Kubernetes) and the main thing that they have in common is that they don't work 
well for my use case. Those systems are designed for large scale 1000+ node 
clusters, and become painful to manage in the small cluster range. Most of the 
tools that we've attempted to use don't work well for us, so we've written 
several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or 
Amazon: we don't have a *data-center* of servers to manage, we barely have half 
of a rack. *We are not trying to solve the problem that you are referring to*. 
We are operating at a level that if we aren't meeting SLAs, then we could just 
buy another server to add to the cluster. I imagine that we are not alone in 
that fact either, I've seen that many of the questions on SO and on the user 
list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these 
multi-node systems typically handle it gracefully. However, if idle CPU is 
expensive, then how much more does wasted CPU cost when a nearly complete task 
is pre-empted and has to be started over? Fortunately for me, that isn't a 
problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark 
>instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have 
considered the avenue of multiple spark clusters, and in reality the 
infrastructure we have set up would allow me to do this relatively easily. In 
fact, in my environment, this is a similar solution to what I'm proposing, just 
managed one layer up the stack and with less flexibility. I am trying to avoid 
this solution however because it does require more overhead and maintenance. 
What if I want two spark apps to run on the same cluster at the same time, 
sharing the available CPU capacity equally? I can't accomplish that easily with 
multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I 
don't have teams of ops people to task with managing as many spark clusters as 
I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. 
In fact, my initial proposal is net yet changing the allocation of memory as a 
resource. This would still be statically allocated in a FIFO manner as long as 
memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find 
a simple solution to a problem that I face, and I'm willing to share that 
solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I 
am not trying to write a scheduler*: I am trying to slightly (and optionally) 
tweak the way executors are allocated and launched, so that I can more 
intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis

________________________________
From: Steve Loughran <ste...@hortonworks.com<mailto:ste...@hortonworks.com>>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling 
utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to 
submit work with different credentials and priority; cgroups & equivalent to 
limit granted resources to requested ones. If you have pre-emption enabled, you 
can even have one job kill work off the others. Spark does recognise 
pre-emption failures and doesn't treat it as a sign of problems in the 
executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale 
computing —if you are curious about what is state of the art, look at the 
Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google 
work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just 
meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% 
drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep 
into the whole world of cluster scheduling, the debate over centralized vs 
decentralized, the idelogical one of "should services ever get allocated 
RAM/CPU in times of low overall load?", the challenge of swap, or more 
specifically, "how do you throttle memory consumption", as well as what to do 
when the IO load of a service is actually incurred on a completely different 
host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the 
Hadoop tree and look at 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux
 for the cgroup support, and then 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
 for the native code needed alongside this. Then consider that it's not just a 
matter of writing something similar, it's getting an OSS project to actually 
commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark 
instances for the different workloads, with different CPU & memory limits, 
queue priorities, etc. Other people have done the work, written the tests, 
deployed it in production, met their own SLAs *and are therefore committed to 
maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis 
<theg...@trilliumit.com<mailto:theg...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the 
scheduling goal I have in mind, I hastily jumped straight into a technical 
solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the 
standalone scheduler. Perhaps you could/should classify it as a different 
scheduler, but I don't want to give the impression that this will be as 
difficult to implement as most schedulers are. Initially, from a memory 
perspective, we would still allocate in a FIFO manner. This new scheduling mode 
(or new scheduler, if you'd rather) would mostly benefit any users with 
small-ish clusters, both on-premise and cloud based. Essentially, my end goal 
is to be able to run multiple *applications* simultaneously with each 
application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a 
second application that I'd like to run hourly. The hourly application is more 
time critical (higher priority), so I'd like it to finish in a small amount of 
time. If I allow the first app to run with all cores (this takes several days 
on my 64 core cluster), then nothing else can be executed when running with the 
default FIFO scheduler. All of the cores have been allocated to the first 
application, and it will not release them until it is finished. Dynamic 
allocation does not help in this case, as there is always a backlog of tasks to 
run until the first application is nearing the end anyway. Naturally, I could 
just limit the number of cores that the first application has access to, but 
then I have idle cpu time when the second app is not running, and that is not 
optimal. Secondly in that case, the second application only has access to the 
*leftover* cores that the first app has not allocated, and will take a 
considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running 
without specifying the number of cores they want to utilize (whether 
intentionally or not). As I'm sure you know, the default is to allocate the 
entire cluster to this application. The cores allocated to this shell are 
unavailable to other applications, even if they are just sitting idle while a 
developer is getting their environment set up to run a very big job 
interactively. Other developers that would like to launch interactive shells 
are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many 
simultaneous applications to be running as the cluster memory (still statically 
partitioned, at least initially) will allow. Applications could be configured 
with a "cpu shares" parameter (just an arbitrary integer relative only to other 
applications) which is essentially just passed through to the linux cgroup 
cpu.shares setting. Since each executor of an application on a given worker 
runs in it's own process/jvm, then that process could be easily be placed into 
a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that 
processes competing for cpu time are allocated a percentage of time equal to 
their share count as a percentage of all shares in that level of the cgroup 
hierarchy. If two applications are both scheduled on the same core with the 
same weight, each will get to utilize 50% of the time on that core. This is all 
built into the kernel, and the only thing the spark worker has to do is create 
a cgroup for each application, set the cpu.shares parameter, and assign the 
executors for that application to the new cgroup. If multiple executors are 
running on a single worker, for a single application, the cpu time available to 
that application is divided among each of those executors equally. The default 
for cpu.shares is that they are not limiting in any way. A process can consume 
all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. 
Current consensus is "no". Why not? Because you may end up writing an 
SLA-sensitive app which just happens to meet it's SLAs in times of light 
cluster load, but precisely when the cluster is busy, it suddenly slows down, 
leading to stress all round, in the "why is this service suddenly unusable" 
kind of stress. Instead you keep the cluster busy with low priority preemptible 
work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to 
some abstraction) is that cpu share allocations are heterogeneous to all 
processes running on a machine. An admin could have very fine grained control 
over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive 
application could utilize 100% of all cluster cores if they are idle. Then my 
time sensitive app could be launched with nine times the priority and the linux 
kernel would scale back the first application to 10% of all cores (completely 
seemlessly and automatically: no pre-emption, just fewer time slices of cpu 
allocated by the kernel to the first application), while the second application 
gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one 
process starts to swap, it hurts everything else. But Java isn't that good at 
handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode 
would create an increase in context switching on each host. This issue is 
somewhat mitigated by still statically allocating memory however, since there 
wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. 
Linux cgroups allow you to control access to more than just cpu shares. You can 
apply the same concept to other resources (memory, disk io). You can also set 
up hard limits so that an application will never get more than is allocated to 
it. I know that those limitations are important for some use cases involving 
predictability of application execution times. Eventually, this idea could be 
expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a 
bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to 
show just how complex it gets in a large system. Before you begin to write a 
line of code, I'd recommend

-you read as much of the published work as you can, including the google and 
microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, 
that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still 
considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve



Reply via email to