RE: Help me understand the partition, parallelism in Spark

2015-02-26 Thread java8964
Anyone can share any thoughts related to my questions?
Thanks

From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Help me understand the partition, parallelism in Spark
Date: Wed, 25 Feb 2015 21:58:55 -0500




Hi, Sparkers:
I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.
I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or spark.default.parallelism shouldn't have any impact.
For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:
We have 100 partitions, as the data comes from 100 blocks. Most likely the 
spark will generate 100 tasks to read and shuffle them?The 1000 unique keys 
mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there 
will be up to 50 tasks can be run concurrently. The rest tasks just have to 
wait for the core, if there are 50 tasks are running.Since we are doing 
reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, 
as we have 1000 unique keys.I don't know these 1000 partitions will be 
processed by how many tasks, maybe this is the parallelism parameter comes 
in?No matter what parallelism this will be, there are ONLY 50 task can be run 
concurrently. So if we set more cores, more partitions' data will be processed 
in the executor (which runs more thread in this case), so more memory needs. I 
don't see how increasing parallelism could help the OOM in this case.In my test 
case of Spark SQL, I gave 24G as the executor heap, my join between 2 big 
datasets keeps getting OOM. I keep increasing the spark.default.parallelism, 
from 200 to 400, to 2000, even to 4000, no help. What really makes the query 
finish finally without OOM is after I change the --total-executor-cores from 
10 to 4.
So my questions are:1) What is the parallelism really mean in the Spark? In the 
simple example above, for reduceByKey, what difference it is between 
parallelism change from 10 to 20?2) When we talk about partition in the spark, 
for the data coming from HDFS, I can understand the partition clearly. For the 
intermediate data, the partition will be same as key, right? For group, 
reducing, join action, uniqueness of the keys will be partition. Is that 
correct?3) Why increasing parallelism could help OOM? I don't get this part. 
From my limited experience, adjusting the core count really matters for memory.
Thanks
Yong
  

Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Imran Rashid
Hi Yong,

mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
partitions to use before it even knows how many unique keys there are.  If
you have 200 as the default parallelism (or you just explicitly make it the
second parameter to reduceByKey()), then you will get 200 partitions.  The
1000 unique keys will be distributed across the 200 partitions.  ideally
they will be distributed pretty equally, but how they get distributed
depends on the partitioner (by default you will have a HashPartitioner, so
it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark
where there is some overhead proportional to the size of a partition.  So
in your example, if you have 1000 unique keys in 200 partitions, you expect
about 5 unique keys per partitions -- if instead you had 10 partitions,
you'd expect 100 unique keys per partitions, and thus more data and you'd
be more likely to hit an OOM.  But there are many other possible sources of
OOM, so this is definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody
more knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be one
 HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks to read and shuffle them?
- The 1000 unique keys mean the 1000 reducer group, like in MR
- If I set the max core to be 50, so there will be up to 50 tasks can
be run concurrently. The rest tasks just have to wait for the core, if
there are 50 tasks are running.
- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.
- I don't know these 1000 partitions will be processed by how many
tasks, maybe this is the parallelism parameter comes in?
- No matter what parallelism this will be, there are ONLY 50 task can
be run concurrently. So if we set more cores, more partitions' data will be
processed in the executor (which runs more thread in this case), so more
memory needs. I don't see how increasing parallelism could help the OOM in
this case.
- In my test case of Spark SQL, I gave 24G as the executor heap, my
join between 2 big datasets keeps getting OOM. I keep increasing the
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no
help. What really makes the query finish finally without OOM is after I
change the --total-executor-cores from 10 to 4.


 So my questions are:
 1) What is the parallelism really mean in the Spark? In the simple example
 above, for reduceByKey, what difference it is between parallelism change
 from 10 to 20?
 2) When we talk about partition in the spark, for the data coming from
 HDFS, I can understand the partition clearly. For the intermediate data,
 the partition will be same as key, right? For group, reducing, join action,
 uniqueness of the keys will be partition. Is that correct?
 3) Why increasing parallelism could help OOM? I don't get this part. From
 my limited experience, adjusting the core count really matters for memory.

 Thanks

 Yong



RE: Help me understand the partition, parallelism in Spark

2015-02-26 Thread java8964
Imran, thanks for your explaining about the parallelism. That is very helpful.
In my test case, I am only use one box cluster, with one executor. So if I put 
10 cores, then 10 concurrent task will be run within this one executor, which 
will handle more data than 4 core case, then leaded to OOM.
I haven't setup Spark on our production cluster yet, but assume that we have 
100 nodes cluster, if I guess right, set up to 1000 cores mean that on  
average, each box's executor will run 10 threads to process data. So lowering 
core will reduce the speed of spark, but can help to avoid the OOM, as less 
data to be processed in the memory.
My another guess is that each partition will be processed by one core 
eventually. So make bigger partition count can decrease partition size, which 
should help the memory footprint. In my case, I guess that Spark SQL in fact 
doesn't use the spark.default.parallelism setting, or at least in my query, 
it is not used. So no matter what I changed, it doesn't matter. The reason I 
said that is that there is always 200 tasks in stage 2 and 3 of my query job, 
no matter what I set the spark.default.parallelism.
I think lowering the core is to exchange lower memory usage vs speed. Hope my 
understanding is correct.
Thanks
Yong
Date: Thu, 26 Feb 2015 17:03:20 -0500
Subject: Re: Help me understand the partition, parallelism in Spark
From: yana.kadiy...@gmail.com
To: iras...@cloudera.com
CC: java8...@hotmail.com; user@spark.apache.org

Imran, I have also observed the phenomenon of reducing the cores helping with 
OOM. I wanted to ask this (hopefully without straying off topic): we can 
specify the number of cores and the executor memory. But we don't get to 
specify _how_ the cores are spread among executors.
Is it possible that with 24G memory and 4 cores we get a spread of 1 core per 
executor thus ending up with 24G for the task, but with 24G memory and 10 cores 
some executor ends up with 3 cores on the same machine and thus we have only 8G 
per task?
On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com wrote:
Hi Yong,
mostly correct except for:Since we are doing reduceByKey, shuffling will 
happen. Data will be shuffled into 1000 partitions, as we have 1000 unique 
keys.no, you will not get 1000 partitions.  Spark has to decide how many 
partitions to use before it even knows how many unique keys there are.  If you 
have 200 as the default parallelism (or you just explicitly make it the second 
parameter to reduceByKey()), then you will get 200 partitions.  The 1000 unique 
keys will be distributed across the 200 partitions.  ideally they will be 
distributed pretty equally, but how they get distributed depends on the 
partitioner (by default you will have a HashPartitioner, so it depends on the 
hash of your keys).
Note that this is more or less the same as in Hadoop MapReduce.
the amount of parallelism matters b/c there are various places in spark where 
there is some overhead proportional to the size of a partition.  So in your 
example, if you have 1000 unique keys in 200 partitions, you expect about 5 
unique keys per partitions -- if instead you had 10 partitions, you'd expect 
100 unique keys per partitions, and thus more data and you'd be more likely to 
hit an OOM.  But there are many other possible sources of OOM, so this is 
definitely not the *only* solution.
Sorry I can't comment in particular about Spark SQL -- hopefully somebody more 
knowledgeable can comment on that.


On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:



Hi, Sparkers:
I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.
I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or spark.default.parallelism shouldn't have any impact.
For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:
We have 100 partitions, as the data comes from 100 blocks. Most likely the 
spark will generate 100 tasks to read and shuffle them?The 1000 unique keys 
mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there 
will be up to 50 tasks can be run concurrently. The rest tasks just have to 
wait for the core, if there are 50 tasks are running.Since we are doing 
reduceByKey

Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Yana Kadiyska
Imran, I have also observed the phenomenon of reducing the cores helping
with OOM. I wanted to ask this (hopefully without straying off topic): we
can specify the number of cores and the executor memory. But we don't get
to specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core
per executor thus ending up with 24G for the task, but with 24G memory and
10 cores some executor ends up with 3 cores on the same machine and thus we
have only 8G per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com wrote:

 Hi Yong,

 mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
 partitions to use before it even knows how many unique keys there are.  If
 you have 200 as the default parallelism (or you just explicitly make it the
 second parameter to reduceByKey()), then you will get 200 partitions.  The
 1000 unique keys will be distributed across the 200 partitions.  ideally
 they will be distributed pretty equally, but how they get distributed
 depends on the partitioner (by default you will have a HashPartitioner, so
 it depends on the hash of your keys).

 Note that this is more or less the same as in Hadoop MapReduce.

 the amount of parallelism matters b/c there are various places in spark
 where there is some overhead proportional to the size of a partition.  So
 in your example, if you have 1000 unique keys in 200 partitions, you expect
 about 5 unique keys per partitions -- if instead you had 10 partitions,
 you'd expect 100 unique keys per partitions, and thus more data and you'd
 be more likely to hit an OOM.  But there are many other possible sources of
 OOM, so this is definitely not the *only* solution.

 Sorry I can't comment in particular about Spark SQL -- hopefully somebody
 more knowledgeable can comment on that.



 On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be
 one HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks to read and shuffle them?
- The 1000 unique keys mean the 1000 reducer group, like in MR
- If I set the max core to be 50, so there will be up to 50 tasks can
be run concurrently. The rest tasks just have to wait for the core, if
there are 50 tasks are running.
- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.
- I don't know these 1000 partitions will be processed by how many
tasks, maybe this is the parallelism parameter comes in?
- No matter what parallelism this will be, there are ONLY 50 task can
be run concurrently. So if we set more cores, more partitions' data will 
 be
processed in the executor (which runs more thread in this case), so more
memory needs. I don't see how increasing parallelism could help the OOM in
this case.
- In my test case of Spark SQL, I gave 24G as the executor heap, my
join between 2 big datasets keeps getting OOM. I keep increasing the
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no
help. What really makes the query finish finally without OOM is after I
change the --total-executor-cores from 10 to 4.


 So my questions are:
 1) What is the parallelism really mean in the Spark? In the simple
 example above, for reduceByKey, what difference it is between parallelism
 change from 10 to 20?
 2) When we talk about partition in the spark, for the data coming from
 HDFS, I can understand the partition clearly. For the intermediate data,
 the partition will be same as key, right? For group, reducing, join action,
 uniqueness of the keys will be partition. Is that correct?
 3) Why increasing parallelism could help OOM? 

Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Zhan Zhang
Here is my understanding.

When running on top of yarn, the cores means the number of tasks can run in one 
executor. But all these cores are located in the same JVM.

Parallelism typically control the balance of tasks. For example, if you have 
200 cores, but only 50 partitions. There will be 150 cores sitting idle.

OOM: increase the memory size, and JVM memory overhead may help here.

Thanks.

Zhan Zhang

On Feb 26, 2015, at 2:03 PM, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:

Imran, I have also observed the phenomenon of reducing the cores helping with 
OOM. I wanted to ask this (hopefully without straying off topic): we can 
specify the number of cores and the executor memory. But we don't get to 
specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core per 
executor thus ending up with 24G for the task, but with 24G memory and 10 cores 
some executor ends up with 3 cores on the same machine and thus we have only 8G 
per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid 
iras...@cloudera.commailto:iras...@cloudera.com wrote:
Hi Yong,

mostly correct except for:

  *   Since we are doing reduceByKey, shuffling will happen. Data will be 
shuffled into 1000 partitions, as we have 1000 unique keys.

no, you will not get 1000 partitions.  Spark has to decide how many partitions 
to use before it even knows how many unique keys there are.  If you have 200 as 
the default parallelism (or you just explicitly make it the second parameter to 
reduceByKey()), then you will get 200 partitions.  The 1000 unique keys will be 
distributed across the 200 partitions.  ideally they will be distributed pretty 
equally, but how they get distributed depends on the partitioner (by default 
you will have a HashPartitioner, so it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark where 
there is some overhead proportional to the size of a partition.  So in your 
example, if you have 1000 unique keys in 200 partitions, you expect about 5 
unique keys per partitions -- if instead you had 10 partitions, you'd expect 
100 unique keys per partitions, and thus more data and you'd be more likely to 
hit an OOM.  But there are many other possible sources of OOM, so this is 
definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody more 
knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964 
java8...@hotmail.commailto:java8...@hotmail.com wrote:
Hi, Sparkers:

I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.

I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or spark.default.parallelism shouldn't have any impact.

For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:


  *   We have 100 partitions, as the data comes from 100 blocks. Most likely 
the spark will generate 100 tasks to read and shuffle them?
  *   The 1000 unique keys mean the 1000 reducer group, like in MR
  *   If I set the max core to be 50, so there will be up to 50 tasks can be 
run concurrently. The rest tasks just have to wait for the core, if there are 
50 tasks are running.
  *   Since we are doing reduceByKey, shuffling will happen. Data will be 
shuffled into 1000 partitions, as we have 1000 unique keys.
  *   I don't know these 1000 partitions will be processed by how many tasks, 
maybe this is the parallelism parameter comes in?
  *   No matter what parallelism this will be, there are ONLY 50 task can be 
run concurrently. So if we set more cores, more partitions' data will be 
processed in the executor (which runs more thread in this case), so more memory 
needs. I don't see how increasing parallelism could help the OOM in this case.
  *   In my test case of Spark SQL, I gave 24G as the executor heap, my join 
between 2 big datasets keeps getting OOM. I keep increasing the 
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no help. 
What really makes the query finish finally without OOM is after I change the 
--total-executor-cores from 10 to 4.


Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Yana Kadiyska
Yong, for the 200 tasks in stage 2 and 3 -- this actually comes from the
shuffle setting: spark.sql.shuffle.partitions

On Thu, Feb 26, 2015 at 5:51 PM, java8964 java8...@hotmail.com wrote:

 Imran, thanks for your explaining about the parallelism. That is very
 helpful.

 In my test case, I am only use one box cluster, with one executor. So if I
 put 10 cores, then 10 concurrent task will be run within this one executor,
 which will handle more data than 4 core case, then leaded to OOM.

 I haven't setup Spark on our production cluster yet, but assume that we
 have 100 nodes cluster, if I guess right, set up to 1000 cores mean that on
  average, each box's executor will run 10 threads to process data. So
 lowering core will reduce the speed of spark, but can help to avoid the
 OOM, as less data to be processed in the memory.

 My another guess is that each partition will be processed by one core
 eventually. So make bigger partition count can decrease partition size,
 which should help the memory footprint. In my case, I guess that Spark SQL
 in fact doesn't use the spark.default.parallelism setting, or at least in
 my query, it is not used. So no matter what I changed, it doesn't matter.
 The reason I said that is that there is always 200 tasks in stage 2 and 3
 of my query job, no matter what I set the spark.default.parallelism.

 I think lowering the core is to exchange lower memory usage vs speed. Hope
 my understanding is correct.

 Thanks

 Yong

 --
 Date: Thu, 26 Feb 2015 17:03:20 -0500
 Subject: Re: Help me understand the partition, parallelism in Spark
 From: yana.kadiy...@gmail.com
 To: iras...@cloudera.com
 CC: java8...@hotmail.com; user@spark.apache.org


 Imran, I have also observed the phenomenon of reducing the cores helping
 with OOM. I wanted to ask this (hopefully without straying off topic): we
 can specify the number of cores and the executor memory. But we don't get
 to specify _how_ the cores are spread among executors.

 Is it possible that with 24G memory and 4 cores we get a spread of 1 core
 per executor thus ending up with 24G for the task, but with 24G memory and
 10 cores some executor ends up with 3 cores on the same machine and thus we
 have only 8G per task?

 On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Yong,

 mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
 partitions to use before it even knows how many unique keys there are.  If
 you have 200 as the default parallelism (or you just explicitly make it the
 second parameter to reduceByKey()), then you will get 200 partitions.  The
 1000 unique keys will be distributed across the 200 partitions.  ideally
 they will be distributed pretty equally, but how they get distributed
 depends on the partitioner (by default you will have a HashPartitioner, so
 it depends on the hash of your keys).

 Note that this is more or less the same as in Hadoop MapReduce.

 the amount of parallelism matters b/c there are various places in spark
 where there is some overhead proportional to the size of a partition.  So
 in your example, if you have 1000 unique keys in 200 partitions, you expect
 about 5 unique keys per partitions -- if instead you had 10 partitions,
 you'd expect 100 unique keys per partitions, and thus more data and you'd
 be more likely to hit an OOM.  But there are many other possible sources of
 OOM, so this is definitely not the *only* solution.

 Sorry I can't comment in particular about Spark SQL -- hopefully somebody
 more knowledgeable can comment on that.



 On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be one
 HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks

Help me understand the partition, parallelism in Spark

2015-02-25 Thread java8964
Hi, Sparkers:
I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.
I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or spark.default.parallelism shouldn't have any impact.
For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:
We have 100 partitions, as the data comes from 100 blocks. Most likely the 
spark will generate 100 tasks to read and shuffle them?The 1000 unique keys 
mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there 
will be up to 50 tasks can be run concurrently. The rest tasks just have to 
wait for the core, if there are 50 tasks are running.Since we are doing 
reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, 
as we have 1000 unique keys.I don't know these 1000 partitions will be 
processed by how many tasks, maybe this is the parallelism parameter comes 
in?No matter what parallelism this will be, there are ONLY 50 task can be run 
concurrently. So if we set more cores, more partitions' data will be processed 
in the executor (which runs more thread in this case), so more memory needs. I 
don't see how increasing parallelism could help the OOM in this case.In my test 
case of Spark SQL, I gave 24G as the executor heap, my join between 2 big 
datasets keeps getting OOM. I keep increasing the spark.default.parallelism, 
from 200 to 400, to 2000, even to 4000, no help. What really makes the query 
finish finally without OOM is after I change the --total-executor-cores from 
10 to 4.
So my questions are:1) What is the parallelism really mean in the Spark? In the 
simple example above, for reduceByKey, what difference it is between 
parallelism change from 10 to 20?2) When we talk about partition in the spark, 
for the data coming from HDFS, I can understand the partition clearly. For the 
intermediate data, the partition will be same as key, right? For group, 
reducing, join action, uniqueness of the keys will be partition. Is that 
correct?3) Why increasing parallelism could help OOM? I don't get this part. 
From my limited experience, adjusting the core count really matters for memory.
Thanks
Yong