Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

2016-12-05 Thread Michal Šenkýř

Hello Travis,


I am just a short-time member of this list but I can definitely see the 
benefit of using built-in OS resource management facilities to 
dynamically manage cluster resources on the node level in this manner. 
At our company we often fight for resources on our development cluster 
as well as sometimes cancel running jobs in production to free up 
immediately needed resources. If I understand it correctly, this would 
solve a lot of our problems.



The only downside I see with this is that it is Linux-specific.


Michal


On 5.12.2016 16:36, Hegner, Travis wrote:


My apologies, in my excitement of finding a rather simple way to 
accomplish the scheduling goal I have in mind, I hastily jumped 
straight into a technical solution, without explaining that goal, or 
the problem it's attempting to solve.



You are correct that I'm looking for an additional running mode for 
the standalone scheduler. Perhaps you could/should classify it as a 
different scheduler, but I don't want to give the impression that this 
will be as difficult to implement as most schedulers are. Initially, 
from a memory perspective, we would still allocate in a FIFO 
manner. This new scheduling mode (or new scheduler, if you'd 
rather) would mostly benefit any users with small-ish clusters, both 
on-premise and cloud based. Essentially, my end goal is to be able to 
run multiple *applications* simultaneously with each application 
having *access* to the entire core count of the cluster.



I have a very cpu intensive application that I'd like to run weekly. I 
have a second application that I'd like to run hourly. The hourly 
application is more time critical (higher priority), so I'd like it to 
finish in a small amount of time. If I allow the first app to run with 
all cores (this takes several days on my 64 core cluster), then 
nothing else can be executed when running with the default FIFO 
scheduler. All of the cores have been allocated to the 
first application, and it will not release them until it is finished. 
Dynamic allocation does not help in this case, as there is always a 
backlog of tasks to run until the first application is nearing the end 
anyway. Naturally, I could just limit the number of cores that the 
first application has access to, but then I have idle cpu time when 
the second app is not running, and that is not optimal. Secondly in 
that case, the second application only has access to the *leftover* 
cores that the first app has not allocated, and will take a 
considerably longer amount of time to run.



You could also imagine a scenario where a developer has a spark-shell 
running without specifying the number of cores they want to utilize 
(whether intentionally or not). As I'm sure you know, the default is 
to allocate the entire cluster to this application. The cores 
allocated to this shell are unavailable to other applications, even if 
they are just sitting idle while a developer is getting their 
environment set up to run a very big job interactively. Other 
developers that would like to launch interactive shells are stuck 
waiting for the first one to exit their shell.



My proposal would eliminate this static nature of core counts and 
allow as many simultaneous applications to be running as the cluster 
memory (still statically partitioned, at least initially) will allow. 
Applications could be configured with a "cpu shares" parameter (just 
an arbitrary integer relative only to other applications) which is 
essentially just passed through to the linux cgroup cpu.shares 
setting. Since each executor of an application on a given worker runs 
in it's own process/jvm, then that process could be easily be placed 
into a cgroup created and dedicated for that application.



Linux cgroups cpu.shares are pretty well documented, but the gist is 
that processes competing for cpu time are allocated a percentage of 
time equal to their share count as a percentage of all shares in that 
level of the cgroup hierarchy. If two applications are both scheduled 
on the same core with the same weight, each will get to utilize 50% of 
the time on that core. This is all built into the kernel, and the only 
thing the spark worker has to do is create a cgroup for each 
application, set the cpu.shares parameter, and assign the executors 
for that application to the new cgroup. If multiple executors are 
running on a single worker, for a single application, the cpu time 
available to that application is divided among each of those executors 
equally. The default for cpu.shares is that they are not limiting in 
any way. A process can consume all available cpu time if it would 
otherwise be idle anyway.



Another benefit to passing cpu.shares directly to the kernel (as 
opposed to some abstraction) is that cpu share allocations are 
heterogeneous to all processes running on a machine. An admin could 
have very fine grained control over which processes get priority 
access to cpu time, depending on their 

Re: Can I add a new method to RDD class?

2016-12-05 Thread Michal Šenkýř

A simple Scala example of implicit classes:

implicit  class  EnhancedString(str:String) {
  def  prefix(prefix:String)=  prefix+  str
}

println("World".prefix("Hello "))

As Tarun said, you have to import it if it's not in the same class where 
you use it.


Hope this makes it clearer,

Michal Senkyr


On 5.12.2016 07:43, Tarun Kumar wrote:
Not sure if that's documented in terms of Spark but this is a fairly 
common pattern in scala known as "pimp my library" pattern, you can 
easily find many generic example of using this pattern. If you want I 
can quickly cook up a short conplete example with rdd(although there 
is nothing really more to my example in earlier mail) ? Thanks Tarun Kumar


On Mon, 5 Dec 2016 at 7:15 AM, long > wrote:


So is there documentation of this I can refer to?


On Dec 5, 2016, at 1:07 AM, Tarun Kumar [via Apache Spark
Developers List] <[hidden email]
> wrote:

Hi Tenglong, In addition to trsell's reply, you can add any
method to an rdd without making changes to spark code. This can
be achieved by using implicit class in your own client code:
implicit class extendRDD[T](rdd: RDD[T]){ def foo() } Then you
basically nees to import this implicit class in scope where you
want to use the new foo method. Thanks Tarun Kumar

On Mon, 5 Dec 2016 at 6:59 AM, <[hidden
email]> wrote:

How does your application fetch the spark dependency? Perhaps
list your project dependencies and check it's using your dev
build.


On Mon, 5 Dec 2016, 08:47 tenglong, <[hidden
email]> wrote:

Hi,

Apparently, I've already tried adding a new method to RDD,

for example,

class RDD {
  def foo() // this is the one I added

  def map()

  def collect()
}

I can build Spark successfully, but I can't compile my
application code
which calls rdd.foo(), and the error message says

value foo is not a member of org.apache.spark.rdd.RDD[String]

So I am wondering if there is any mechanism prevents me
from doing this or
something I'm doing wrong?




--
View this message in context:

http://apache-spark-developers-list.1001551.n3.nabble.com/Can-I-add-a-new-method-to-RDD-class-tp20100.html
Sent from the Apache Spark Developers List mailing list
archive at Nabble.com .


-

To unsubscribe e-mail: [hidden email]




If you reply to this email, your message will be added to the
discussion below:

http://apache-spark-developers-list.1001551.n3.nabble.com/Can-I-add-a-new-method-to-RDD-class-tp20100p20102.html

To unsubscribe from Can I add a new method to RDD class?, click here.
NAML







View this message in context: Re: Can I add a new method to RDD
class?


Sent from the Apache Spark Developers List mailing list archive
 at
Nabble.com.