I would imagine this would be an extension of SchemaRDD (for Sparksql) or a new RDD altogether. The RDD location is determined based on where task generating the RDD is scheduled, the scheduler schedules basis of input RDD/sourcedata location. So ideally RDD codebase needs to check location of input partition across nodes & scheduling preference of task related to unbalanced partition to different nodes.. I am not sure if RDD can influence location of tasks /partition location.
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Wed, Jun 25, 2014 at 10:18 PM, Sean McNamara <sean.mcnam...@webtrends.com > wrote: > Yep exactly! I’m not sure how complicated it would be to pull off. If > someone wouldn’t mind helping to get me pointed in the right direction I > would be happy to look into and contribute this functionality. I imagine > this would be implemented in the scheduler codebase and there would be some > sort of rebalance configuration property to enable it possibly? > > Does anyone else have any thoughts on this? > > Cheers, > > Sean > > > On Jun 24, 2014, at 4:41 PM, Mayur Rustagi <mayur.rust...@gmail.com> > wrote: > > > This would be really useful. Especially for Shark where shift of > > partitioning effects all subsequent queries unless task scheduling time > > beats spark.locality.wait. Can cause overall low performance for all > > subsequent tasks. > > > > Mayur Rustagi > > Ph: +1 (760) 203 3257 > > http://www.sigmoidanalytics.com > > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > > > > > On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara < > sean.mcnam...@webtrends.com> > > wrote: > > > >> We have a use case where we’d like something to execute once on each > node > >> and I thought it would be good to ask here. > >> > >> Currently we achieve this by setting the parallelism to the number of > >> nodes and use a mod partitioner: > >> > >> val balancedRdd = sc.parallelize( > >> (0 until Settings.parallelism) > >> .map(id => id -> Settings.settings) > >> ).partitionBy(new ModPartitioner(Settings.parallelism)) > >> .cache() > >> > >> > >> This works great except in two instances where it can become unbalanced: > >> > >> 1. if a worker is restarted or dies, the partition will move to a > >> different node (one of the nodes will run two tasks). When the worker > >> rejoins, is there a way to have a partition move back over to the newly > >> restarted worker so that it’s balanced again? > >> > >> 2. drivers need to be started in a staggered fashion, otherwise one > driver > >> can launch two tasks on one set of workers, and the other driver will do > >> the same with the other set. Are there any scheduler/config semantics > so > >> that each driver will take one (and only one) core from *each* node? > >> > >> > >> Thanks > >> > >> Sean > >> > >> > >> > >> > >> > >> > >> > >