Hi Yong,

mostly correct except for:

>
>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>    shuffled into 1000 partitions, as we have 1000 unique keys.
>
> no, you will not get 1000 partitions.  Spark has to decide how many
partitions to use before it even knows how many unique keys there are.  If
you have 200 as the default parallelism (or you just explicitly make it the
second parameter to reduceByKey()), then you will get 200 partitions.  The
1000 unique keys will be distributed across the 200 partitions.  ideally
they will be distributed pretty equally, but how they get distributed
depends on the partitioner (by default you will have a HashPartitioner, so
it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark
where there is some overhead proportional to the size of a partition.  So
in your example, if you have 1000 unique keys in 200 partitions, you expect
about 5 unique keys per partitions -- if instead you had 10 partitions,
you'd expect 100 unique keys per partitions, and thus more data and you'd
be more likely to hit an OOM.  But there are many other possible sources of
OOM, so this is definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody
more knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964 <java8...@hotmail.com> wrote:

> Hi, Sparkers:
>
> I come from the Hadoop MapReducer world, and try to understand some
> internal information of spark. From the web and this list, I keep seeing
> people talking about increase the parallelism if you get the OOM error. I
> tried to read document as much as possible to understand the RDD partition,
> and parallelism usage in the spark.
>
> I understand that for RDD from HDFS, by default, one partition will be one
> HDFS block, pretty straightforward. I saw that lots of RDD operations
> support 2nd parameter of parallelism. This is the part confuse me. From my
> understand, the parallelism is totally controlled by how many cores you
> give to your job. Adjust that parameter, or "spark.default.parallelism"
> shouldn't have any impact.
>
> For example, if I have a 10G data in HDFS, and assume the block size is
> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
> action, using 200 as the default parallelism. Here is what I assume:
>
>
>    - We have 100 partitions, as the data comes from 100 blocks. Most
>    likely the spark will generate 100 tasks to read and shuffle them?
>    - The 1000 unique keys mean the 1000 reducer group, like in MR
>    - If I set the max core to be 50, so there will be up to 50 tasks can
>    be run concurrently. The rest tasks just have to wait for the core, if
>    there are 50 tasks are running.
>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>    shuffled into 1000 partitions, as we have 1000 unique keys.
>    - I don't know these 1000 partitions will be processed by how many
>    tasks, maybe this is the parallelism parameter comes in?
>    - No matter what parallelism this will be, there are ONLY 50 task can
>    be run concurrently. So if we set more cores, more partitions' data will be
>    processed in the executor (which runs more thread in this case), so more
>    memory needs. I don't see how increasing parallelism could help the OOM in
>    this case.
>    - In my test case of Spark SQL, I gave 24G as the executor heap, my
>    join between 2 big datasets keeps getting OOM. I keep increasing the
>    "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no
>    help. What really makes the query finish finally without OOM is after I
>    change the "--total-executor-cores" from 10 to 4.
>
>
> So my questions are:
> 1) What is the parallelism really mean in the Spark? In the simple example
> above, for reduceByKey, what difference it is between parallelism change
> from 10 to 20?
> 2) When we talk about partition in the spark, for the data coming from
> HDFS, I can understand the partition clearly. For the intermediate data,
> the partition will be same as key, right? For group, reducing, join action,
> uniqueness of the keys will be partition. Is that correct?
> 3) Why increasing parallelism could help OOM? I don't get this part. From
> my limited experience, adjusting the core count really matters for memory.
>
> Thanks
>
> Yong
>

Reply via email to