Hi, Sparkers:
I come from the Hadoop MapReducer world, and try to understand some internal
information of spark. From the web and this list, I keep seeing people talking
about increase the parallelism if you get the OOM error. I tried to read
document as much as possible to understand the RDD partition, and parallelism
usage in the spark.
I understand that for RDD from HDFS, by default, one partition will be one HDFS
block, pretty straightforward. I saw that lots of RDD operations support 2nd
parameter of parallelism. This is the part confuse me. From my understand, the
parallelism is totally controlled by how many cores you give to your job.
Adjust that parameter, or "spark.default.parallelism" shouldn't have any impact.
For example, if I have a 10G data in HDFS, and assume the block size is 128M,
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using
200 as the default parallelism. Here is what I assume:
We have 100 partitions, as the data comes from 100 blocks. Most likely the
spark will generate 100 tasks to read and shuffle them?The 1000 unique keys
mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there
will be up to 50 tasks can be run concurrently. The rest tasks just have to
wait for the core, if there are 50 tasks are running.Since we are doing
reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions,
as we have 1000 unique keys.I don't know these 1000 partitions will be
processed by how many tasks, maybe this is the parallelism parameter comes
in?No matter what parallelism this will be, there are ONLY 50 task can be run
concurrently. So if we set more cores, more partitions' data will be processed
in the executor (which runs more thread in this case), so more memory needs. I
don't see how increasing parallelism could help the OOM in this case.In my test
case of Spark SQL, I gave 24G as the executor heap, my join between 2 big
datasets keeps getting OOM. I keep increasing the "spark.default.parallelism",
from 200 to 400, to 2000, even to 4000, no help. What really makes the query
finish finally without OOM is after I change the "--total-executor-cores" from
10 to 4.
So my questions are:1) What is the parallelism really mean in the Spark? In the
simple example above, for reduceByKey, what difference it is between
parallelism change from 10 to 20?2) When we talk about partition in the spark,
for the data coming from HDFS, I can understand the partition clearly. For the
intermediate data, the partition will be same as key, right? For group,
reducing, join action, uniqueness of the keys will be partition. Is that
correct?3) Why increasing parallelism could help OOM? I don't get this part.
From my limited experience, adjusting the core count really matters for memory.
Thanks
Yong