I would imagine this would be an extension of SchemaRDD (for Sparksql)  or
a new RDD altogether.
The RDD location is determined based on where task generating the RDD is
scheduled, the scheduler schedules basis of input RDD/sourcedata location.
So ideally RDD codebase needs to check location of input partition across
nodes & scheduling  preference of task related to unbalanced partition to
different nodes.. I am not sure if RDD can influence location of tasks
/partition location.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Wed, Jun 25, 2014 at 10:18 PM, Sean McNamara <sean.mcnam...@webtrends.com
> wrote:

> Yep exactly!  I’m not sure how complicated it would be to pull off.  If
> someone wouldn’t mind helping to get me pointed in the right direction I
> would be happy to look into and contribute this functionality.  I imagine
> this would be implemented in the scheduler codebase and there would be some
> sort of rebalance configuration property to enable it possibly?
>
> Does anyone else have any thoughts on this?
>
> Cheers,
>
> Sean
>
>
> On Jun 24, 2014, at 4:41 PM, Mayur Rustagi <mayur.rust...@gmail.com>
> wrote:
>
> > This would be really useful. Especially for Shark where shift of
> > partitioning effects all subsequent queries unless task scheduling time
> > beats spark.locality.wait. Can cause overall low performance for all
> > subsequent tasks.
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi <https://twitter.com/mayur_rustagi>
> >
> >
> >
> > On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara <
> sean.mcnam...@webtrends.com>
> > wrote:
> >
> >> We have a use case where we’d like something to execute once on each
> node
> >> and I thought it would be good to ask here.
> >>
> >> Currently we achieve this by setting the parallelism to the number of
> >> nodes and use a mod partitioner:
> >>
> >> val balancedRdd = sc.parallelize(
> >>        (0 until Settings.parallelism)
> >>        .map(id => id -> Settings.settings)
> >>      ).partitionBy(new ModPartitioner(Settings.parallelism))
> >>      .cache()
> >>
> >>
> >> This works great except in two instances where it can become unbalanced:
> >>
> >> 1. if a worker is restarted or dies, the partition will move to a
> >> different node (one of the nodes will run two tasks).  When the worker
> >> rejoins, is there a way to have a partition move back over to the newly
> >> restarted worker so that it’s balanced again?
> >>
> >> 2. drivers need to be started in a staggered fashion, otherwise one
> driver
> >> can launch two tasks on one set of workers, and the other driver will do
> >> the same with the other set.  Are there any scheduler/config semantics
> so
> >> that each driver will take one (and only one) core from *each* node?
> >>
> >>
> >> Thanks
> >>
> >> Sean
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>

Reply via email to