Yong, for the 200 tasks in stage 2 and 3 -- this actually comes from the
shuffle setting: spark.sql.shuffle.partitions

On Thu, Feb 26, 2015 at 5:51 PM, java8964 <java8...@hotmail.com> wrote:

> Imran, thanks for your explaining about the parallelism. That is very
> helpful.
>
> In my test case, I am only use one box cluster, with one executor. So if I
> put 10 cores, then 10 concurrent task will be run within this one executor,
> which will handle more data than 4 core case, then leaded to OOM.
>
> I haven't setup Spark on our production cluster yet, but assume that we
> have 100 nodes cluster, if I guess right, set up to 1000 cores mean that on
>  average, each box's executor will run 10 threads to process data. So
> lowering core will reduce the speed of spark, but can help to avoid the
> OOM, as less data to be processed in the memory.
>
> My another guess is that each partition will be processed by one core
> eventually. So make bigger partition count can decrease partition size,
> which should help the memory footprint. In my case, I guess that Spark SQL
> in fact doesn't use the "spark.default.parallelism" setting, or at least in
> my query, it is not used. So no matter what I changed, it doesn't matter.
> The reason I said that is that there is always 200 tasks in stage 2 and 3
> of my query job, no matter what I set the "spark.default.parallelism".
>
> I think lowering the core is to exchange lower memory usage vs speed. Hope
> my understanding is correct.
>
> Thanks
>
> Yong
>
> ------------------------------
> Date: Thu, 26 Feb 2015 17:03:20 -0500
> Subject: Re: Help me understand the partition, parallelism in Spark
> From: yana.kadiy...@gmail.com
> To: iras...@cloudera.com
> CC: java8...@hotmail.com; user@spark.apache.org
>
>
> Imran, I have also observed the phenomenon of reducing the cores helping
> with OOM. I wanted to ask this (hopefully without straying off topic): we
> can specify the number of cores and the executor memory. But we don't get
> to specify _how_ the cores are spread among executors.
>
> Is it possible that with 24G memory and 4 cores we get a spread of 1 core
> per executor thus ending up with 24G for the task, but with 24G memory and
> 10 cores some executor ends up with 3 cores on the same machine and thus we
> have only 8G per task?
>
> On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid <iras...@cloudera.com>
> wrote:
>
> Hi Yong,
>
> mostly correct except for:
>
>
>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>    shuffled into 1000 partitions, as we have 1000 unique keys.
>
> no, you will not get 1000 partitions.  Spark has to decide how many
> partitions to use before it even knows how many unique keys there are.  If
> you have 200 as the default parallelism (or you just explicitly make it the
> second parameter to reduceByKey()), then you will get 200 partitions.  The
> 1000 unique keys will be distributed across the 200 partitions.  ideally
> they will be distributed pretty equally, but how they get distributed
> depends on the partitioner (by default you will have a HashPartitioner, so
> it depends on the hash of your keys).
>
> Note that this is more or less the same as in Hadoop MapReduce.
>
> the amount of parallelism matters b/c there are various places in spark
> where there is some overhead proportional to the size of a partition.  So
> in your example, if you have 1000 unique keys in 200 partitions, you expect
> about 5 unique keys per partitions -- if instead you had 10 partitions,
> you'd expect 100 unique keys per partitions, and thus more data and you'd
> be more likely to hit an OOM.  But there are many other possible sources of
> OOM, so this is definitely not the *only* solution.
>
> Sorry I can't comment in particular about Spark SQL -- hopefully somebody
> more knowledgeable can comment on that.
>
>
>
> On Wed, Feb 25, 2015 at 8:58 PM, java8964 <java8...@hotmail.com> wrote:
>
> Hi, Sparkers:
>
> I come from the Hadoop MapReducer world, and try to understand some
> internal information of spark. From the web and this list, I keep seeing
> people talking about increase the parallelism if you get the OOM error. I
> tried to read document as much as possible to understand the RDD partition,
> and parallelism usage in the spark.
>
> I understand that for RDD from HDFS, by default, one partition will be one
> HDFS block, pretty straightforward. I saw that lots of RDD operations
> support 2nd parameter of parallelism. This is the part confuse me. From my
> understand, the parallelism is totally controlled by how many cores you
> give to your job. Adjust that parameter, or "spark.default.parallelism"
> shouldn't have any impact.
>
> For example, if I have a 10G data in HDFS, and assume the block size is
> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
> action, using 200 as the default parallelism. Here is what I assume:
>
>
>    - We have 100 partitions, as the data comes from 100 blocks. Most
>    likely the spark will generate 100 tasks to read and shuffle them?
>    - The 1000 unique keys mean the 1000 reducer group, like in MR
>    - If I set the max core to be 50, so there will be up to 50 tasks can
>    be run concurrently. The rest tasks just have to wait for the core, if
>    there are 50 tasks are running.
>    - Since we are doing reduceByKey, shuffling will happen. Data will be
>    shuffled into 1000 partitions, as we have 1000 unique keys.
>    - I don't know these 1000 partitions will be processed by how many
>    tasks, maybe this is the parallelism parameter comes in?
>    - No matter what parallelism this will be, there are ONLY 50 task can
>    be run concurrently. So if we set more cores, more partitions' data will be
>    processed in the executor (which runs more thread in this case), so more
>    memory needs. I don't see how increasing parallelism could help the OOM in
>    this case.
>    - In my test case of Spark SQL, I gave 24G as the executor heap, my
>    join between 2 big datasets keeps getting OOM. I keep increasing the
>    "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no
>    help. What really makes the query finish finally without OOM is after I
>    change the "--total-executor-cores" from 10 to 4.
>
>
> So my questions are:
> 1) What is the parallelism really mean in the Spark? In the simple example
> above, for reduceByKey, what difference it is between parallelism change
> from 10 to 20?
> 2) When we talk about partition in the spark, for the data coming from
> HDFS, I can understand the partition clearly. For the intermediate data,
> the partition will be same as key, right? For group, reducing, join action,
> uniqueness of the keys will be partition. Is that correct?
> 3) Why increasing parallelism could help OOM? I don't get this part. From
> my limited experience, adjusting the core count really matters for memory.
>
> Thanks
>
> Yong
>
>
>
>

Reply via email to