Hi Yin, the way I set the configuration is:
val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf("spark.sql.shuffle.partitions","1000"); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi Yin, > > thanks a lot for that! Will give it a shot and let you know. > > On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote: > >> Was the OOM thrown during the execution of first stage (map) or the >> second stage (reduce)? If it was the second stage, can you increase the >> value of spark.sql.shuffle.partitions and see if the OOM disappears? >> >> This setting controls the number of reduces Spark SQL will use and the >> default is 200. Maybe there are too many distinct values and the memory >> pressure on every task (of those 200 reducers) is pretty high. You can >> start with 400 and increase it until the OOM disappears. Hopefully this >> will help. >> >> Thanks, >> >> Yin >> >> >> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com> >> wrote: >> >>> Hi Yin, >>> >>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each. >>> The number of tasks launched is equal to the number of parquet files. Do >>> you have any idea on how to deal with this situation? >>> >>> Thanks a lot >>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote: >>> >>>> Seems there are too many distinct groups processed in a task, which >>>> trigger the problem. >>>> >>>> How many files do your dataset have and how large is a file? Seems your >>>> query will be executed with two stages, table scan and map-side aggregation >>>> in the first stage and the final round of reduce-side aggregation in the >>>> second stage. Can you take a look at the numbers of tasks launched in these >>>> two stages? >>>> >>>> Thanks, >>>> >>>> Yin >>>> >>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <johngou...@gmail.com >>>> > wrote: >>>> >>>>> Hi there, I set the executor memory to 8g but it didn't help >>>>> >>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> wrote: >>>>> >>>>>> You should probably increase executor memory by setting >>>>>> "spark.executor.memory". >>>>>> >>>>>> Full list of available configurations can be found here >>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>> >>>>>> Cheng >>>>>> >>>>>> >>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: >>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> I was trying the new DataFrame API with some basic operations on a >>>>>>> parquet dataset. >>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a >>>>>>> standalone cluster mode. >>>>>>> The code is the following: >>>>>>> >>>>>>> val people = sqlContext.parquetFile("/data.parquet"); >>>>>>> val res = people.groupBy("name","date"). >>>>>>> agg(sum("power"),sum("supply")).take(10); >>>>>>> System.out.println(res); >>>>>>> >>>>>>> The dataset consists of 16 billion entries. >>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit >>>>>>> exceeded >>>>>>> >>>>>>> My configuration is: >>>>>>> >>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>>> spark.driver.memory 6g >>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops >>>>>>> spark.shuffle.manager sort >>>>>>> >>>>>>> Any idea how can I workaround this? >>>>>>> >>>>>>> Thanks a lot >>>>>>> >>>>>> >>>>>> >>>>> >>>> >> >