Hey Yiannis, If you just perform a count on each "name", "date" pair... can it succeed? If so, can you do a count and then order by to find the largest one?
I'm wondering if there is a single pathologically large group here that is somehow causing OOM. Also, to be clear, you are getting GC limit warnings on the executors, not the driver. Correct? - Patrick On Mon, Mar 23, 2015 at 10:21 AM, Martin Goodson <mar...@skimlinks.com> wrote: > Have you tried to repartition() your original data to make more partitions > before you aggregate? > > > -- > Martin Goodson | VP Data Science > (0)20 3397 1240 > [image: Inline image 1] > > On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas <johngou...@gmail.com> > wrote: > >> Hi Yin, >> >> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g >> without any success. >> I cannot figure out how to increase the number of mapPartitions tasks. >> >> Thanks a lot >> >> On 20 March 2015 at 18:44, Yin Huai <yh...@databricks.com> wrote: >> >>> spark.sql.shuffle.partitions only control the number of tasks in the >>> second stage (the number of reducers). For your case, I'd say that the >>> number of tasks in the first state (number of mappers) will be the number >>> of files you have. >>> >>> Actually, have you changed "spark.executor.memory" (it controls the >>> memory for an executor of your application)? I did not see it in your >>> original email. The difference between worker memory and executor memory >>> can be found at ( >>> http://spark.apache.org/docs/1.3.0/spark-standalone.html), >>> >>> SPARK_WORKER_MEMORY >>> Total amount of memory to allow Spark applications to use on the >>> machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that >>> each application's individual memory is configured using its >>> spark.executor.memory property. >>> >>> >>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com> >>> wrote: >>> >>>> Actually I realized that the correct way is: >>>> >>>> sqlContext.sql("set spark.sql.shuffle.partitions=1000") >>>> >>>> but I am still experiencing the same behavior/error. >>>> >>>> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> >>>> wrote: >>>> >>>>> Hi Yin, >>>>> >>>>> the way I set the configuration is: >>>>> >>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000"); >>>>> >>>>> it is the correct way right? >>>>> In the mapPartitions task (the first task which is launched), I get >>>>> again the same number of tasks and again the same error. :( >>>>> >>>>> Thanks a lot! >>>>> >>>>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Yin, >>>>>> >>>>>> thanks a lot for that! Will give it a shot and let you know. >>>>>> >>>>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote: >>>>>> >>>>>>> Was the OOM thrown during the execution of first stage (map) or the >>>>>>> second stage (reduce)? If it was the second stage, can you increase the >>>>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears? >>>>>>> >>>>>>> This setting controls the number of reduces Spark SQL will use and >>>>>>> the default is 200. Maybe there are too many distinct values and the >>>>>>> memory >>>>>>> pressure on every task (of those 200 reducers) is pretty high. You can >>>>>>> start with 400 and increase it until the OOM disappears. Hopefully this >>>>>>> will help. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Yin >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas < >>>>>>> johngou...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Yin, >>>>>>>> >>>>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB >>>>>>>> each. The number of tasks launched is equal to the number of parquet >>>>>>>> files. >>>>>>>> Do you have any idea on how to deal with this situation? >>>>>>>> >>>>>>>> Thanks a lot >>>>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote: >>>>>>>> >>>>>>>>> Seems there are too many distinct groups processed in a task, >>>>>>>>> which trigger the problem. >>>>>>>>> >>>>>>>>> How many files do your dataset have and how large is a file? Seems >>>>>>>>> your query will be executed with two stages, table scan and map-side >>>>>>>>> aggregation in the first stage and the final round of reduce-side >>>>>>>>> aggregation in the second stage. Can you take a look at the numbers of >>>>>>>>> tasks launched in these two stages? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Yin >>>>>>>>> >>>>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas < >>>>>>>>> johngou...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi there, I set the executor memory to 8g but it didn't help >>>>>>>>>> >>>>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> You should probably increase executor memory by setting >>>>>>>>>>> "spark.executor.memory". >>>>>>>>>>> >>>>>>>>>>> Full list of available configurations can be found here >>>>>>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>>>>>> >>>>>>>>>>> Cheng >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi there, >>>>>>>>>>>> >>>>>>>>>>>> I was trying the new DataFrame API with some basic operations >>>>>>>>>>>> on a parquet dataset. >>>>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker >>>>>>>>>>>> in a standalone cluster mode. >>>>>>>>>>>> The code is the following: >>>>>>>>>>>> >>>>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet"); >>>>>>>>>>>> val res = people.groupBy("name","date"). >>>>>>>>>>>> agg(sum("power"),sum("supply")).take(10); >>>>>>>>>>>> System.out.println(res); >>>>>>>>>>>> >>>>>>>>>>>> The dataset consists of 16 billion entries. >>>>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead >>>>>>>>>>>> limit exceeded >>>>>>>>>>>> >>>>>>>>>>>> My configuration is: >>>>>>>>>>>> >>>>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>>>>>>>> spark.driver.memory 6g >>>>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops >>>>>>>>>>>> spark.shuffle.manager sort >>>>>>>>>>>> >>>>>>>>>>>> Any idea how can I workaround this? >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >