Hey Yiannis,

If you just perform a count on each "name", "date" pair... can it succeed?
If so, can you do a count and then order by to find the largest one?

I'm wondering if there is a single pathologically large group here that is
somehow causing OOM.

Also, to be clear, you are getting GC limit warnings on the executors, not
the driver. Correct?

- Patrick

On Mon, Mar 23, 2015 at 10:21 AM, Martin Goodson <mar...@skimlinks.com>
wrote:

> Have you tried to repartition() your original data to make more partitions
> before you aggregate?
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
> On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi Yin,
>>
>> Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
>> without any success.
>> I cannot figure out how to increase the number of mapPartitions tasks.
>>
>> Thanks a lot
>>
>> On 20 March 2015 at 18:44, Yin Huai <yh...@databricks.com> wrote:
>>
>>> spark.sql.shuffle.partitions only control the number of tasks in the
>>> second stage (the number of reducers). For your case, I'd say that the
>>> number of tasks in the first state (number of mappers) will be the number
>>> of files you have.
>>>
>>> Actually, have you changed "spark.executor.memory" (it controls the
>>> memory for an executor of your application)? I did not see it in your
>>> original email. The difference between worker memory and executor memory
>>> can be found at (
>>> http://spark.apache.org/docs/1.3.0/spark-standalone.html),
>>>
>>> SPARK_WORKER_MEMORY
>>> Total amount of memory to allow Spark applications to use on the
>>> machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that
>>> each application's individual memory is configured using its
>>> spark.executor.memory property.
>>>
>>>
>>> On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com>
>>> wrote:
>>>
>>>> Actually I realized that the correct way is:
>>>>
>>>> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>>>>
>>>> but I am still experiencing the same behavior/error.
>>>>
>>>> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Yin,
>>>>>
>>>>> the way I set the configuration is:
>>>>>
>>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>>>>
>>>>> it is the correct way right?
>>>>> In the mapPartitions task (the first task which is launched), I get
>>>>> again the same number of tasks and again the same error. :(
>>>>>
>>>>> Thanks a lot!
>>>>>
>>>>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Yin,
>>>>>>
>>>>>> thanks a lot for that! Will give it a shot and let you know.
>>>>>>
>>>>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote:
>>>>>>
>>>>>>> Was the OOM thrown during the execution of first stage (map) or the
>>>>>>> second stage (reduce)? If it was the second stage, can you increase the
>>>>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>>>>>
>>>>>>> This setting controls the number of reduces Spark SQL will use and
>>>>>>> the default is 200. Maybe there are too many distinct values and the 
>>>>>>> memory
>>>>>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>>>>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>>>>>> will help.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Yin
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <
>>>>>>> johngou...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Yin,
>>>>>>>>
>>>>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB
>>>>>>>> each. The number of tasks launched is equal to the number of parquet 
>>>>>>>> files.
>>>>>>>> Do you have any idea on how to deal with this situation?
>>>>>>>>
>>>>>>>> Thanks a lot
>>>>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> Seems there are too many distinct groups processed in a task,
>>>>>>>>> which trigger the problem.
>>>>>>>>>
>>>>>>>>> How many files do your dataset have and how large is a file? Seems
>>>>>>>>> your query will be executed with two stages, table scan and map-side
>>>>>>>>> aggregation in the first stage and the final round of reduce-side
>>>>>>>>> aggregation in the second stage. Can you take a look at the numbers of
>>>>>>>>> tasks launched in these two stages?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Yin
>>>>>>>>>
>>>>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>>>>>>>> johngou...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi there, I set the executor memory to 8g but it didn't help
>>>>>>>>>>
>>>>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You should probably increase executor memory by setting
>>>>>>>>>>> "spark.executor.memory".
>>>>>>>>>>>
>>>>>>>>>>> Full list of available configurations can be found here
>>>>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>>>>
>>>>>>>>>>> Cheng
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi there,
>>>>>>>>>>>>
>>>>>>>>>>>> I was trying the new DataFrame API with some basic operations
>>>>>>>>>>>> on a parquet dataset.
>>>>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
>>>>>>>>>>>> in a standalone cluster mode.
>>>>>>>>>>>> The code is the following:
>>>>>>>>>>>>
>>>>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet");
>>>>>>>>>>>> val res = people.groupBy("name","date").
>>>>>>>>>>>> agg(sum("power"),sum("supply")).take(10);
>>>>>>>>>>>> System.out.println(res);
>>>>>>>>>>>>
>>>>>>>>>>>> The dataset consists of 16 billion entries.
>>>>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead
>>>>>>>>>>>> limit exceeded
>>>>>>>>>>>>
>>>>>>>>>>>> My configuration is:
>>>>>>>>>>>>
>>>>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>>>>>>>> spark.driver.memory    6g
>>>>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>>>>>>>>>>> spark.shuffle.manager    sort
>>>>>>>>>>>>
>>>>>>>>>>>> Any idea how can I workaround this?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to