Was the OOM thrown during the execution of first stage (map) or the second
stage (reduce)? If it was the second stage, can you increase the value
of spark.sql.shuffle.partitions and see if the OOM disappears?

This setting controls the number of reduces Spark SQL will use and the
default is 200. Maybe there are too many distinct values and the memory
pressure on every task (of those 200 reducers) is pretty high. You can
start with 400 and increase it until the OOM disappears. Hopefully this
will help.

Thanks,

Yin


On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com>
wrote:

> Hi Yin,
>
> Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The
> number of tasks launched is equal to the number of parquet files. Do you
> have any idea on how to deal with this situation?
>
> Thanks a lot
> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote:
>
>> Seems there are too many distinct groups processed in a task, which
>> trigger the problem.
>>
>> How many files do your dataset have and how large is a file? Seems your
>> query will be executed with two stages, table scan and map-side aggregation
>> in the first stage and the final round of reduce-side aggregation in the
>> second stage. Can you take a look at the numbers of tasks launched in these
>> two stages?
>>
>> Thanks,
>>
>> Yin
>>
>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <johngou...@gmail.com>
>> wrote:
>>
>>> Hi there, I set the executor memory to 8g but it didn't help
>>>
>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> wrote:
>>>
>>>> You should probably increase executor memory by setting
>>>> "spark.executor.memory".
>>>>
>>>> Full list of available configurations can be found here
>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>
>>>> Cheng
>>>>
>>>>
>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I was trying the new DataFrame API with some basic operations on a
>>>>> parquet dataset.
>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>>>>> standalone cluster mode.
>>>>> The code is the following:
>>>>>
>>>>> val people = sqlContext.parquetFile("/data.parquet");
>>>>> val res = people.groupBy("name","date").agg(sum("power"),sum("supply")
>>>>> ).take(10);
>>>>> System.out.println(res);
>>>>>
>>>>> The dataset consists of 16 billion entries.
>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>>>> exceeded
>>>>>
>>>>> My configuration is:
>>>>>
>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>> spark.driver.memory    6g
>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>>>> spark.shuffle.manager    sort
>>>>>
>>>>> Any idea how can I workaround this?
>>>>>
>>>>> Thanks a lot
>>>>>
>>>>
>>>>
>>>
>>

Reply via email to