Yong, for the 200 tasks in stage 2 and 3 -- this actually comes from the shuffle setting: spark.sql.shuffle.partitions
On Thu, Feb 26, 2015 at 5:51 PM, java8964 <java8...@hotmail.com> wrote: > Imran, thanks for your explaining about the parallelism. That is very > helpful. > > In my test case, I am only use one box cluster, with one executor. So if I > put 10 cores, then 10 concurrent task will be run within this one executor, > which will handle more data than 4 core case, then leaded to OOM. > > I haven't setup Spark on our production cluster yet, but assume that we > have 100 nodes cluster, if I guess right, set up to 1000 cores mean that on > average, each box's executor will run 10 threads to process data. So > lowering core will reduce the speed of spark, but can help to avoid the > OOM, as less data to be processed in the memory. > > My another guess is that each partition will be processed by one core > eventually. So make bigger partition count can decrease partition size, > which should help the memory footprint. In my case, I guess that Spark SQL > in fact doesn't use the "spark.default.parallelism" setting, or at least in > my query, it is not used. So no matter what I changed, it doesn't matter. > The reason I said that is that there is always 200 tasks in stage 2 and 3 > of my query job, no matter what I set the "spark.default.parallelism". > > I think lowering the core is to exchange lower memory usage vs speed. Hope > my understanding is correct. > > Thanks > > Yong > > ------------------------------ > Date: Thu, 26 Feb 2015 17:03:20 -0500 > Subject: Re: Help me understand the partition, parallelism in Spark > From: yana.kadiy...@gmail.com > To: iras...@cloudera.com > CC: java8...@hotmail.com; user@spark.apache.org > > > Imran, I have also observed the phenomenon of reducing the cores helping > with OOM. I wanted to ask this (hopefully without straying off topic): we > can specify the number of cores and the executor memory. But we don't get > to specify _how_ the cores are spread among executors. > > Is it possible that with 24G memory and 4 cores we get a spread of 1 core > per executor thus ending up with 24G for the task, but with 24G memory and > 10 cores some executor ends up with 3 cores on the same machine and thus we > have only 8G per task? > > On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid <iras...@cloudera.com> > wrote: > > Hi Yong, > > mostly correct except for: > > > - Since we are doing reduceByKey, shuffling will happen. Data will be > shuffled into 1000 partitions, as we have 1000 unique keys. > > no, you will not get 1000 partitions. Spark has to decide how many > partitions to use before it even knows how many unique keys there are. If > you have 200 as the default parallelism (or you just explicitly make it the > second parameter to reduceByKey()), then you will get 200 partitions. The > 1000 unique keys will be distributed across the 200 partitions. ideally > they will be distributed pretty equally, but how they get distributed > depends on the partitioner (by default you will have a HashPartitioner, so > it depends on the hash of your keys). > > Note that this is more or less the same as in Hadoop MapReduce. > > the amount of parallelism matters b/c there are various places in spark > where there is some overhead proportional to the size of a partition. So > in your example, if you have 1000 unique keys in 200 partitions, you expect > about 5 unique keys per partitions -- if instead you had 10 partitions, > you'd expect 100 unique keys per partitions, and thus more data and you'd > be more likely to hit an OOM. But there are many other possible sources of > OOM, so this is definitely not the *only* solution. > > Sorry I can't comment in particular about Spark SQL -- hopefully somebody > more knowledgeable can comment on that. > > > > On Wed, Feb 25, 2015 at 8:58 PM, java8964 <java8...@hotmail.com> wrote: > > Hi, Sparkers: > > I come from the Hadoop MapReducer world, and try to understand some > internal information of spark. From the web and this list, I keep seeing > people talking about increase the parallelism if you get the OOM error. I > tried to read document as much as possible to understand the RDD partition, > and parallelism usage in the spark. > > I understand that for RDD from HDFS, by default, one partition will be one > HDFS block, pretty straightforward. I saw that lots of RDD operations > support 2nd parameter of parallelism. This is the part confuse me. From my > understand, the parallelism is totally controlled by how many cores you > give to your job. Adjust that parameter, or "spark.default.parallelism" > shouldn't have any impact. > > For example, if I have a 10G data in HDFS, and assume the block size is > 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to > a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey > action, using 200 as the default parallelism. Here is what I assume: > > > - We have 100 partitions, as the data comes from 100 blocks. Most > likely the spark will generate 100 tasks to read and shuffle them? > - The 1000 unique keys mean the 1000 reducer group, like in MR > - If I set the max core to be 50, so there will be up to 50 tasks can > be run concurrently. The rest tasks just have to wait for the core, if > there are 50 tasks are running. > - Since we are doing reduceByKey, shuffling will happen. Data will be > shuffled into 1000 partitions, as we have 1000 unique keys. > - I don't know these 1000 partitions will be processed by how many > tasks, maybe this is the parallelism parameter comes in? > - No matter what parallelism this will be, there are ONLY 50 task can > be run concurrently. So if we set more cores, more partitions' data will be > processed in the executor (which runs more thread in this case), so more > memory needs. I don't see how increasing parallelism could help the OOM in > this case. > - In my test case of Spark SQL, I gave 24G as the executor heap, my > join between 2 big datasets keeps getting OOM. I keep increasing the > "spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no > help. What really makes the query finish finally without OOM is after I > change the "--total-executor-cores" from 10 to 4. > > > So my questions are: > 1) What is the parallelism really mean in the Spark? In the simple example > above, for reduceByKey, what difference it is between parallelism change > from 10 to 20? > 2) When we talk about partition in the spark, for the data coming from > HDFS, I can understand the partition clearly. For the intermediate data, > the partition will be same as key, right? For group, reducing, join action, > uniqueness of the keys will be partition. Is that correct? > 3) Why increasing parallelism could help OOM? I don't get this part. From > my limited experience, adjusting the core count really matters for memory. > > Thanks > > Yong > > > >