Re: data localisation in spark

2015-06-03 Thread Sandy Ryza
Tasks are scheduled on executors based on data locality.  Things work as
you would expect in the example you brought up.

Through dynamic allocation, the number of executors can change throughout
the life time of an application.  10 executors (or 5 executors with 2 cores
each) are not needed for a reducebyKey with parallelism = 10.  If there are
fewer slots to run tasks than tasks, the tasks will just be run serially.

-Sandy

On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora 
wrote:

>  So in spark is after acquiring executors from ClusterManeger, does tasks
> are scheduled on executors based on datalocality ?I Mean if in an
> application there are 2 jobs and output of 1 job is used as input of
> another job.
> And in job1 I did persist on some RDD, then while running job2 will it use
> the same executor where job1's output was persisted or it acquire executor
> again and data movement happens?
>
> And is it true no of execuotrs in an application are fixed and acquired at
> start of application  and remains same throught application? If yes, how
> does it takes cares of explicit no of reducers in some of apis say
> rddd.reduceByKey(func,10);
>
> does at converting DAG to stages it calculates executors required and then
> acquire executors/worker nodes ?
>
>
> On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza 
> wrote:
>
>> It is not possible with JavaSparkContext either.  The API mentioned below
>> currently does not have any effect (we should document this).
>>
>> The primary difference between MR and Spark here is that MR runs each
>> task in its own YARN container, while Spark runs multiple tasks within an
>> executor, which needs to be requested before Spark knows what tasks it will
>> run.  Although dynamic allocation improves that last part.
>>
>> -Sandy
>>
>> On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora > > wrote:
>>
>>> Is it possible in JavaSparkContext ?
>>>
>>> JavaSparkContext jsc = new JavaSparkContext(conf);
>>> JavaRDDlines = jsc.textFile(args[0]);
>>>
>>> If yes , does its programmer's responsibilty to first calculate splits
>>> locations and then instantiate spark context with preferred locations?
>>>
>>> How does its achieved in MR2 with yarn, there is Application Master
>>> specifies split locations to ResourceManager before acquiring the node
>>> managers ?
>>>
>>>
>>>
>>> On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com  wrote:
>>>
>>>> Take a look at the following SparkContext constructor variant that
>>>> tries to honor the data locality in YARN mode.
>>>>
>>>>   /**
>>>> * :: DeveloperApi ::
>>>> * Alternative constructor for setting preferred locations where Spark
>>>> will create executors.
>>>> *
>>>> * @param preferredNodeLocationData used in YARN mode to select nodes to
>>>> launch containers on.
>>>> * Can be generated using
>>>> [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
>>>> * from a list of input files or InputFormats for the application.
>>>> */
>>>> @DeveloperApi
>>>> def this(config: SparkConf, preferredNodeLocationData: Map[String,
>>>> Set[SplitInfo]]) = {
>>>> this(config)
>>>> this.preferredNodeLocationData = preferredNodeLocationData
>>>> }
>>>>
>>>> --
>>>> bit1...@163.com
>>>>
>>>>
>>>> *From:* Shushant Arora 
>>>> *Date:* 2015-05-31 22:54
>>>> *To:* user 
>>>> *Subject:* data localisation in spark
>>>>
>>>> I want to understand how  spark takes care of data localisation in
>>>> cluster mode when run on YARN.
>>>>
>>>> 1.Driver program asks ResourceManager for executors. Does it tell
>>>> yarn's RM to check HDFS blocks of input data and then allocate executors to
>>>> it.
>>>> And executors remain fixed throughout application or driver program
>>>> asks for new executors when it submits another job in same application ,
>>>> since in spark new job is created for each action . If executors are fixed
>>>> then for second job achieving data localisation is impossible?
>>>>
>>>>
>>>>
>>>> 2.When executors are done with their processing, does they are marked
>>>> as free in ResourceManager's resoruce queue and  executors directly tell
>>>> this to Rm  instead of via driver's ?
>>>>
>>>> Thanks
>>>> Shushant
>>>>
>>>>
>>>
>>
>


Re: data localisation in spark

2015-06-02 Thread Shushant Arora
 So in spark is after acquiring executors from ClusterManeger, does tasks
are scheduled on executors based on datalocality ?I Mean if in an
application there are 2 jobs and output of 1 job is used as input of
another job.
And in job1 I did persist on some RDD, then while running job2 will it use
the same executor where job1's output was persisted or it acquire executor
again and data movement happens?

And is it true no of execuotrs in an application are fixed and acquired at
start of application  and remains same throught application? If yes, how
does it takes cares of explicit no of reducers in some of apis say
rddd.reduceByKey(func,10);

does at converting DAG to stages it calculates executors required and then
acquire executors/worker nodes ?


On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza  wrote:

> It is not possible with JavaSparkContext either.  The API mentioned below
> currently does not have any effect (we should document this).
>
> The primary difference between MR and Spark here is that MR runs each task
> in its own YARN container, while Spark runs multiple tasks within an
> executor, which needs to be requested before Spark knows what tasks it will
> run.  Although dynamic allocation improves that last part.
>
> -Sandy
>
> On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora 
> wrote:
>
>> Is it possible in JavaSparkContext ?
>>
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> JavaRDDlines = jsc.textFile(args[0]);
>>
>> If yes , does its programmer's responsibilty to first calculate splits
>> locations and then instantiate spark context with preferred locations?
>>
>> How does its achieved in MR2 with yarn, there is Application Master
>> specifies split locations to ResourceManager before acquiring the node
>> managers ?
>>
>>
>>
>> On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com  wrote:
>>
>>> Take a look at the following SparkContext constructor variant that
>>> tries to honor the data locality in YARN mode.
>>>
>>>   /**
>>> * :: DeveloperApi ::
>>> * Alternative constructor for setting preferred locations where Spark
>>> will create executors.
>>> *
>>> * @param preferredNodeLocationData used in YARN mode to select nodes to
>>> launch containers on.
>>> * Can be generated using
>>> [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
>>> * from a list of input files or InputFormats for the application.
>>> */
>>> @DeveloperApi
>>> def this(config: SparkConf, preferredNodeLocationData: Map[String,
>>> Set[SplitInfo]]) = {
>>> this(config)
>>> this.preferredNodeLocationData = preferredNodeLocationData
>>> }
>>>
>>> --
>>> bit1...@163.com
>>>
>>>
>>> *From:* Shushant Arora 
>>> *Date:* 2015-05-31 22:54
>>> *To:* user 
>>> *Subject:* data localisation in spark
>>>
>>> I want to understand how  spark takes care of data localisation in
>>> cluster mode when run on YARN.
>>>
>>> 1.Driver program asks ResourceManager for executors. Does it tell yarn's
>>> RM to check HDFS blocks of input data and then allocate executors to it.
>>> And executors remain fixed throughout application or driver program asks
>>> for new executors when it submits another job in same application , since
>>> in spark new job is created for each action . If executors are fixed then
>>> for second job achieving data localisation is impossible?
>>>
>>>
>>>
>>> 2.When executors are done with their processing, does they are marked as
>>> free in ResourceManager's resoruce queue and  executors directly tell this
>>> to Rm  instead of via driver's ?
>>>
>>> Thanks
>>> Shushant
>>>
>>>
>>
>


Re: data localisation in spark

2015-06-02 Thread Sandy Ryza
It is not possible with JavaSparkContext either.  The API mentioned below
currently does not have any effect (we should document this).

The primary difference between MR and Spark here is that MR runs each task
in its own YARN container, while Spark runs multiple tasks within an
executor, which needs to be requested before Spark knows what tasks it will
run.  Although dynamic allocation improves that last part.

-Sandy

On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora 
wrote:

> Is it possible in JavaSparkContext ?
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
> JavaRDDlines = jsc.textFile(args[0]);
>
> If yes , does its programmer's responsibilty to first calculate splits
> locations and then instantiate spark context with preferred locations?
>
> How does its achieved in MR2 with yarn, there is Application Master
> specifies split locations to ResourceManager before acquiring the node
> managers ?
>
>
>
> On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com  wrote:
>
>> Take a look at the following SparkContext constructor variant that tries
>> to honor the data locality in YARN mode.
>>
>>   /**
>> * :: DeveloperApi ::
>> * Alternative constructor for setting preferred locations where Spark
>> will create executors.
>> *
>> * @param preferredNodeLocationData used in YARN mode to select nodes to
>> launch containers on.
>> * Can be generated using
>> [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
>> * from a list of input files or InputFormats for the application.
>> */
>> @DeveloperApi
>> def this(config: SparkConf, preferredNodeLocationData: Map[String,
>> Set[SplitInfo]]) = {
>> this(config)
>> this.preferredNodeLocationData = preferredNodeLocationData
>> }
>>
>> --
>> bit1...@163.com
>>
>>
>> *From:* Shushant Arora 
>> *Date:* 2015-05-31 22:54
>> *To:* user 
>> *Subject:* data localisation in spark
>>
>> I want to understand how  spark takes care of data localisation in
>> cluster mode when run on YARN.
>>
>> 1.Driver program asks ResourceManager for executors. Does it tell yarn's
>> RM to check HDFS blocks of input data and then allocate executors to it.
>> And executors remain fixed throughout application or driver program asks
>> for new executors when it submits another job in same application , since
>> in spark new job is created for each action . If executors are fixed then
>> for second job achieving data localisation is impossible?
>>
>>
>>
>> 2.When executors are done with their processing, does they are marked as
>> free in ResourceManager's resoruce queue and  executors directly tell this
>> to Rm  instead of via driver's ?
>>
>> Thanks
>> Shushant
>>
>>
>


Re: data localisation in spark

2015-06-02 Thread Shushant Arora
Is it possible in JavaSparkContext ?

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDDlines = jsc.textFile(args[0]);

If yes , does its programmer's responsibilty to first calculate splits
locations and then instantiate spark context with preferred locations?

How does its achieved in MR2 with yarn, there is Application Master
specifies split locations to ResourceManager before acquiring the node
managers ?



On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com  wrote:

> Take a look at the following SparkContext constructor variant that tries
> to honor the data locality in YARN mode.
>
>   /**
> * :: DeveloperApi ::
> * Alternative constructor for setting preferred locations where Spark will
> create executors.
> *
> * @param preferredNodeLocationData used in YARN mode to select nodes to
> launch containers on.
> * Can be generated using
> [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
> * from a list of input files or InputFormats for the application.
> */
> @DeveloperApi
> def this(config: SparkConf, preferredNodeLocationData: Map[String,
> Set[SplitInfo]]) = {
> this(config)
> this.preferredNodeLocationData = preferredNodeLocationData
> }
>
> --
> bit1...@163.com
>
>
> *From:* Shushant Arora 
> *Date:* 2015-05-31 22:54
> *To:* user 
> *Subject:* data localisation in spark
>
> I want to understand how  spark takes care of data localisation in cluster
> mode when run on YARN.
>
> 1.Driver program asks ResourceManager for executors. Does it tell yarn's
> RM to check HDFS blocks of input data and then allocate executors to it.
> And executors remain fixed throughout application or driver program asks
> for new executors when it submits another job in same application , since
> in spark new job is created for each action . If executors are fixed then
> for second job achieving data localisation is impossible?
>
>
>
> 2.When executors are done with their processing, does they are marked as
> free in ResourceManager's resoruce queue and  executors directly tell this
> to Rm  instead of via driver's ?
>
> Thanks
> Shushant
>
>


Re: data localisation in spark

2015-05-31 Thread Sandy Ryza
Hi Shushant,

Spark currently makes no effort to request executors based on data locality
(although it does try to schedule tasks within executors based on data
locality).  We're working on adding this capability at SPARK-4352
.

-Sandy

On Sun, May 31, 2015 at 7:24 AM, Shushant Arora 
wrote:

>
> I want to understand how  spark takes care of data localisation in cluster
> mode when run on YARN.
>
> 1.Driver program asks ResourceManager for executors. Does it tell yarn's
> RM to check HDFS blocks of input data and then allocate executors to it.
> And executors remain fixed throughout application or driver program asks
> for new executors when it submits another job in same application , since
> in spark new job is created for each action . If executors are fixed then
> for second job achieving data localisation is impossible?
>
>
>
> 2.When executors are done with their processing, does they are marked as
> free in ResourceManager's resoruce queue and  executors directly tell this
> to Rm  instead of via driver's ?
>
> Thanks
> Shushant
>


data localisation in spark

2015-05-31 Thread Shushant Arora
I want to understand how  spark takes care of data localisation in cluster
mode when run on YARN.

1.Driver program asks ResourceManager for executors. Does it tell yarn's RM
to check HDFS blocks of input data and then allocate executors to it.
And executors remain fixed throughout application or driver program asks
for new executors when it submits another job in same application , since
in spark new job is created for each action . If executors are fixed then
for second job achieving data localisation is impossible?



2.When executors are done with their processing, does they are marked as
free in ResourceManager's resoruce queue and  executors directly tell this
to Rm  instead of via driver's ?

Thanks
Shushant