Actually I realized that the correct way is:

sqlContext.sql("set spark.sql.shuffle.partitions=1000")

but I am still experiencing the same behavior/error.

On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote:

> Hi Yin,
>
> the way I set the configuration is:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>
> it is the correct way right?
> In the mapPartitions task (the first task which is launched), I get again
> the same number of tasks and again the same error. :(
>
> Thanks a lot!
>
> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>
>> Hi Yin,
>>
>> thanks a lot for that! Will give it a shot and let you know.
>>
>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote:
>>
>>> Was the OOM thrown during the execution of first stage (map) or the
>>> second stage (reduce)? If it was the second stage, can you increase the
>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>
>>> This setting controls the number of reduces Spark SQL will use and the
>>> default is 200. Maybe there are too many distinct values and the memory
>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>> will help.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yin,
>>>>
>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
>>>> The number of tasks launched is equal to the number of parquet files. Do
>>>> you have any idea on how to deal with this situation?
>>>>
>>>> Thanks a lot
>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote:
>>>>
>>>>> Seems there are too many distinct groups processed in a task, which
>>>>> trigger the problem.
>>>>>
>>>>> How many files do your dataset have and how large is a file? Seems
>>>>> your query will be executed with two stages, table scan and map-side
>>>>> aggregation in the first stage and the final round of reduce-side
>>>>> aggregation in the second stage. Can you take a look at the numbers of
>>>>> tasks launched in these two stages?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Yin
>>>>>
>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>>>> johngou...@gmail.com> wrote:
>>>>>
>>>>>> Hi there, I set the executor memory to 8g but it didn't help
>>>>>>
>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> wrote:
>>>>>>
>>>>>>> You should probably increase executor memory by setting
>>>>>>> "spark.executor.memory".
>>>>>>>
>>>>>>> Full list of available configurations can be found here
>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>
>>>>>>> Cheng
>>>>>>>
>>>>>>>
>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> I was trying the new DataFrame API with some basic operations on a
>>>>>>>> parquet dataset.
>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
>>>>>>>> a standalone cluster mode.
>>>>>>>> The code is the following:
>>>>>>>>
>>>>>>>> val people = sqlContext.parquetFile("/data.parquet");
>>>>>>>> val res = people.groupBy("name","date").
>>>>>>>> agg(sum("power"),sum("supply")).take(10);
>>>>>>>> System.out.println(res);
>>>>>>>>
>>>>>>>> The dataset consists of 16 billion entries.
>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>>>>>>> exceeded
>>>>>>>>
>>>>>>>> My configuration is:
>>>>>>>>
>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>>>> spark.driver.memory    6g
>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>>>>>>> spark.shuffle.manager    sort
>>>>>>>>
>>>>>>>> Any idea how can I workaround this?
>>>>>>>>
>>>>>>>> Thanks a lot
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Reply via email to