Hi Yin, Yes, I have set spark.executor.memory to 8g and the worker memory to 16g without any success. I cannot figure out how to increase the number of mapPartitions tasks.
Thanks a lot On 20 March 2015 at 18:44, Yin Huai <yh...@databricks.com> wrote: > spark.sql.shuffle.partitions only control the number of tasks in the > second stage (the number of reducers). For your case, I'd say that the > number of tasks in the first state (number of mappers) will be the number > of files you have. > > Actually, have you changed "spark.executor.memory" (it controls the > memory for an executor of your application)? I did not see it in your > original email. The difference between worker memory and executor memory > can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html > ), > > SPARK_WORKER_MEMORY > Total amount of memory to allow Spark applications to use on the machine, > e.g. 1000m, 2g (default: total memory minus 1 GB); note that each > application's individual memory is configured using its > spark.executor.memory property. > > > On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com> > wrote: > >> Actually I realized that the correct way is: >> >> sqlContext.sql("set spark.sql.shuffle.partitions=1000") >> >> but I am still experiencing the same behavior/error. >> >> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote: >> >>> Hi Yin, >>> >>> the way I set the configuration is: >>> >>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>> sqlContext.setConf("spark.sql.shuffle.partitions","1000"); >>> >>> it is the correct way right? >>> In the mapPartitions task (the first task which is launched), I get >>> again the same number of tasks and again the same error. :( >>> >>> Thanks a lot! >>> >>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> wrote: >>> >>>> Hi Yin, >>>> >>>> thanks a lot for that! Will give it a shot and let you know. >>>> >>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote: >>>> >>>>> Was the OOM thrown during the execution of first stage (map) or the >>>>> second stage (reduce)? If it was the second stage, can you increase the >>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears? >>>>> >>>>> This setting controls the number of reduces Spark SQL will use and the >>>>> default is 200. Maybe there are too many distinct values and the memory >>>>> pressure on every task (of those 200 reducers) is pretty high. You can >>>>> start with 400 and increase it until the OOM disappears. Hopefully this >>>>> will help. >>>>> >>>>> Thanks, >>>>> >>>>> Yin >>>>> >>>>> >>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com >>>>> > wrote: >>>>> >>>>>> Hi Yin, >>>>>> >>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB >>>>>> each. The number of tasks launched is equal to the number of parquet >>>>>> files. >>>>>> Do you have any idea on how to deal with this situation? >>>>>> >>>>>> Thanks a lot >>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote: >>>>>> >>>>>>> Seems there are too many distinct groups processed in a task, which >>>>>>> trigger the problem. >>>>>>> >>>>>>> How many files do your dataset have and how large is a file? Seems >>>>>>> your query will be executed with two stages, table scan and map-side >>>>>>> aggregation in the first stage and the final round of reduce-side >>>>>>> aggregation in the second stage. Can you take a look at the numbers of >>>>>>> tasks launched in these two stages? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Yin >>>>>>> >>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas < >>>>>>> johngou...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi there, I set the executor memory to 8g but it didn't help >>>>>>>> >>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> You should probably increase executor memory by setting >>>>>>>>> "spark.executor.memory". >>>>>>>>> >>>>>>>>> Full list of available configurations can be found here >>>>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>>>> >>>>>>>>> Cheng >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: >>>>>>>>> >>>>>>>>>> Hi there, >>>>>>>>>> >>>>>>>>>> I was trying the new DataFrame API with some basic operations on >>>>>>>>>> a parquet dataset. >>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker >>>>>>>>>> in a standalone cluster mode. >>>>>>>>>> The code is the following: >>>>>>>>>> >>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet"); >>>>>>>>>> val res = people.groupBy("name","date"). >>>>>>>>>> agg(sum("power"),sum("supply")).take(10); >>>>>>>>>> System.out.println(res); >>>>>>>>>> >>>>>>>>>> The dataset consists of 16 billion entries. >>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit >>>>>>>>>> exceeded >>>>>>>>>> >>>>>>>>>> My configuration is: >>>>>>>>>> >>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>>>>>> spark.driver.memory 6g >>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops >>>>>>>>>> spark.shuffle.manager sort >>>>>>>>>> >>>>>>>>>> Any idea how can I workaround this? >>>>>>>>>> >>>>>>>>>> Thanks a lot >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >> >