Have you tried to repartition() your original data to make more partitions before you aggregate?
-- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi Yin, > > Yes, I have set spark.executor.memory to 8g and the worker memory to 16g > without any success. > I cannot figure out how to increase the number of mapPartitions tasks. > > Thanks a lot > > On 20 March 2015 at 18:44, Yin Huai <yh...@databricks.com> wrote: > >> spark.sql.shuffle.partitions only control the number of tasks in the >> second stage (the number of reducers). For your case, I'd say that the >> number of tasks in the first state (number of mappers) will be the number >> of files you have. >> >> Actually, have you changed "spark.executor.memory" (it controls the >> memory for an executor of your application)? I did not see it in your >> original email. The difference between worker memory and executor memory >> can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html >> ), >> >> SPARK_WORKER_MEMORY >> Total amount of memory to allow Spark applications to use on the machine, >> e.g. 1000m, 2g (default: total memory minus 1 GB); note that each >> application's individual memory is configured using its >> spark.executor.memory property. >> >> >> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com> >> wrote: >> >>> Actually I realized that the correct way is: >>> >>> sqlContext.sql("set spark.sql.shuffle.partitions=1000") >>> >>> but I am still experiencing the same behavior/error. >>> >>> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote: >>> >>>> Hi Yin, >>>> >>>> the way I set the configuration is: >>>> >>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000"); >>>> >>>> it is the correct way right? >>>> In the mapPartitions task (the first task which is launched), I get >>>> again the same number of tasks and again the same error. :( >>>> >>>> Thanks a lot! >>>> >>>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> >>>> wrote: >>>> >>>>> Hi Yin, >>>>> >>>>> thanks a lot for that! Will give it a shot and let you know. >>>>> >>>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote: >>>>> >>>>>> Was the OOM thrown during the execution of first stage (map) or the >>>>>> second stage (reduce)? If it was the second stage, can you increase the >>>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears? >>>>>> >>>>>> This setting controls the number of reduces Spark SQL will use and >>>>>> the default is 200. Maybe there are too many distinct values and the >>>>>> memory >>>>>> pressure on every task (of those 200 reducers) is pretty high. You can >>>>>> start with 400 and increase it until the OOM disappears. Hopefully this >>>>>> will help. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Yin >>>>>> >>>>>> >>>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas < >>>>>> johngou...@gmail.com> wrote: >>>>>> >>>>>>> Hi Yin, >>>>>>> >>>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB >>>>>>> each. The number of tasks launched is equal to the number of parquet >>>>>>> files. >>>>>>> Do you have any idea on how to deal with this situation? >>>>>>> >>>>>>> Thanks a lot >>>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote: >>>>>>> >>>>>>>> Seems there are too many distinct groups processed in a task, which >>>>>>>> trigger the problem. >>>>>>>> >>>>>>>> How many files do your dataset have and how large is a file? Seems >>>>>>>> your query will be executed with two stages, table scan and map-side >>>>>>>> aggregation in the first stage and the final round of reduce-side >>>>>>>> aggregation in the second stage. Can you take a look at the numbers of >>>>>>>> tasks launched in these two stages? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Yin >>>>>>>> >>>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas < >>>>>>>> johngou...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi there, I set the executor memory to 8g but it didn't help >>>>>>>>> >>>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> You should probably increase executor memory by setting >>>>>>>>>> "spark.executor.memory". >>>>>>>>>> >>>>>>>>>> Full list of available configurations can be found here >>>>>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>>>>> >>>>>>>>>> Cheng >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: >>>>>>>>>> >>>>>>>>>>> Hi there, >>>>>>>>>>> >>>>>>>>>>> I was trying the new DataFrame API with some basic operations on >>>>>>>>>>> a parquet dataset. >>>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker >>>>>>>>>>> in a standalone cluster mode. >>>>>>>>>>> The code is the following: >>>>>>>>>>> >>>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet"); >>>>>>>>>>> val res = people.groupBy("name","date"). >>>>>>>>>>> agg(sum("power"),sum("supply")).take(10); >>>>>>>>>>> System.out.println(res); >>>>>>>>>>> >>>>>>>>>>> The dataset consists of 16 billion entries. >>>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit >>>>>>>>>>> exceeded >>>>>>>>>>> >>>>>>>>>>> My configuration is: >>>>>>>>>>> >>>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>>>>>>> spark.driver.memory 6g >>>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops >>>>>>>>>>> spark.shuffle.manager sort >>>>>>>>>>> >>>>>>>>>>> Any idea how can I workaround this? >>>>>>>>>>> >>>>>>>>>>> Thanks a lot >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> >