Actually I realized that the correct way is: sqlContext.sql("set spark.sql.shuffle.partitions=1000")
but I am still experiencing the same behavior/error. On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi Yin, > > the way I set the configuration is: > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > sqlContext.setConf("spark.sql.shuffle.partitions","1000"); > > it is the correct way right? > In the mapPartitions task (the first task which is launched), I get again > the same number of tasks and again the same error. :( > > Thanks a lot! > > On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> wrote: > >> Hi Yin, >> >> thanks a lot for that! Will give it a shot and let you know. >> >> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote: >> >>> Was the OOM thrown during the execution of first stage (map) or the >>> second stage (reduce)? If it was the second stage, can you increase the >>> value of spark.sql.shuffle.partitions and see if the OOM disappears? >>> >>> This setting controls the number of reduces Spark SQL will use and the >>> default is 200. Maybe there are too many distinct values and the memory >>> pressure on every task (of those 200 reducers) is pretty high. You can >>> start with 400 and increase it until the OOM disappears. Hopefully this >>> will help. >>> >>> Thanks, >>> >>> Yin >>> >>> >>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com> >>> wrote: >>> >>>> Hi Yin, >>>> >>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each. >>>> The number of tasks launched is equal to the number of parquet files. Do >>>> you have any idea on how to deal with this situation? >>>> >>>> Thanks a lot >>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote: >>>> >>>>> Seems there are too many distinct groups processed in a task, which >>>>> trigger the problem. >>>>> >>>>> How many files do your dataset have and how large is a file? Seems >>>>> your query will be executed with two stages, table scan and map-side >>>>> aggregation in the first stage and the final round of reduce-side >>>>> aggregation in the second stage. Can you take a look at the numbers of >>>>> tasks launched in these two stages? >>>>> >>>>> Thanks, >>>>> >>>>> Yin >>>>> >>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas < >>>>> johngou...@gmail.com> wrote: >>>>> >>>>>> Hi there, I set the executor memory to 8g but it didn't help >>>>>> >>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> wrote: >>>>>> >>>>>>> You should probably increase executor memory by setting >>>>>>> "spark.executor.memory". >>>>>>> >>>>>>> Full list of available configurations can be found here >>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>> >>>>>>> Cheng >>>>>>> >>>>>>> >>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> I was trying the new DataFrame API with some basic operations on a >>>>>>>> parquet dataset. >>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in >>>>>>>> a standalone cluster mode. >>>>>>>> The code is the following: >>>>>>>> >>>>>>>> val people = sqlContext.parquetFile("/data.parquet"); >>>>>>>> val res = people.groupBy("name","date"). >>>>>>>> agg(sum("power"),sum("supply")).take(10); >>>>>>>> System.out.println(res); >>>>>>>> >>>>>>>> The dataset consists of 16 billion entries. >>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit >>>>>>>> exceeded >>>>>>>> >>>>>>>> My configuration is: >>>>>>>> >>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>>>> spark.driver.memory 6g >>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops >>>>>>>> spark.shuffle.manager sort >>>>>>>> >>>>>>>> Any idea how can I workaround this? >>>>>>>> >>>>>>>> Thanks a lot >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >