spark.sql.shuffle.partitions only control the number of tasks in the second
stage (the number of reducers). For your case, I'd say that the number of
tasks in the first state (number of mappers) will be the number of files
you have.

Actually, have you changed "spark.executor.memory" (it controls the memory
for an executor of your application)? I did not see it in your original
email. The difference between worker memory and executor memory can be
found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html),

SPARK_WORKER_MEMORY
Total amount of memory to allow Spark applications to use on the machine,
e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
application's individual memory is configured using its
spark.executor.memory property.


On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas <johngou...@gmail.com>
wrote:

> Actually I realized that the correct way is:
>
> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>
> but I am still experiencing the same behavior/error.
>
> On 20 March 2015 at 16:04, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>
>> Hi Yin,
>>
>> the way I set the configuration is:
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>
>> it is the correct way right?
>> In the mapPartitions task (the first task which is launched), I get again
>> the same number of tasks and again the same error. :(
>>
>> Thanks a lot!
>>
>> On 19 March 2015 at 17:40, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>>
>>> Hi Yin,
>>>
>>> thanks a lot for that! Will give it a shot and let you know.
>>>
>>> On 19 March 2015 at 16:30, Yin Huai <yh...@databricks.com> wrote:
>>>
>>>> Was the OOM thrown during the execution of first stage (map) or the
>>>> second stage (reduce)? If it was the second stage, can you increase the
>>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>>
>>>> This setting controls the number of reduces Spark SQL will use and the
>>>> default is 200. Maybe there are too many distinct values and the memory
>>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>>> will help.
>>>>
>>>> Thanks,
>>>>
>>>> Yin
>>>>
>>>>
>>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas <johngou...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Yin,
>>>>>
>>>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
>>>>> The number of tasks launched is equal to the number of parquet files. Do
>>>>> you have any idea on how to deal with this situation?
>>>>>
>>>>> Thanks a lot
>>>>> On 18 Mar 2015 17:35, "Yin Huai" <yh...@databricks.com> wrote:
>>>>>
>>>>>> Seems there are too many distinct groups processed in a task, which
>>>>>> trigger the problem.
>>>>>>
>>>>>> How many files do your dataset have and how large is a file? Seems
>>>>>> your query will be executed with two stages, table scan and map-side
>>>>>> aggregation in the first stage and the final round of reduce-side
>>>>>> aggregation in the second stage. Can you take a look at the numbers of
>>>>>> tasks launched in these two stages?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yin
>>>>>>
>>>>>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>>>>>> johngou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi there, I set the executor memory to 8g but it didn't help
>>>>>>>
>>>>>>> On 18 March 2015 at 13:59, Cheng Lian <lian.cs....@gmail.com> wrote:
>>>>>>>
>>>>>>>> You should probably increase executor memory by setting
>>>>>>>> "spark.executor.memory".
>>>>>>>>
>>>>>>>> Full list of available configurations can be found here
>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>
>>>>>>>> Cheng
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>>>>>>
>>>>>>>>> Hi there,
>>>>>>>>>
>>>>>>>>> I was trying the new DataFrame API with some basic operations on a
>>>>>>>>> parquet dataset.
>>>>>>>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
>>>>>>>>> a standalone cluster mode.
>>>>>>>>> The code is the following:
>>>>>>>>>
>>>>>>>>> val people = sqlContext.parquetFile("/data.parquet");
>>>>>>>>> val res = people.groupBy("name","date").
>>>>>>>>> agg(sum("power"),sum("supply")).take(10);
>>>>>>>>> System.out.println(res);
>>>>>>>>>
>>>>>>>>> The dataset consists of 16 billion entries.
>>>>>>>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>>>>>>>> exceeded
>>>>>>>>>
>>>>>>>>> My configuration is:
>>>>>>>>>
>>>>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>>>>> spark.driver.memory    6g
>>>>>>>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>>>>>>>> spark.shuffle.manager    sort
>>>>>>>>>
>>>>>>>>> Any idea how can I workaround this?
>>>>>>>>>
>>>>>>>>> Thanks a lot
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to