Maybe the Python worker use too much memory during groupByKey(),
groupByKey() with larger numPartitions can help.

Also, can you upgrade your cluster to 1.1? It can spilling the data
into disks if the memory can not hold all the data during groupByKey().

Also, If there is hot key with dozens of millions of values, the PR [1]
can help it, it actually helped someone with large datasets (3T).

Davies

[1] https://github.com/apache/spark/pull/1977

On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
>
> Sure, I'll post to the mail list.
>
> groupByKey(self, numPartitions=None)
>
> source code
>
> Group the values for each key in the RDD into a single sequence. 
> Hash-partitions the resulting RDD with into numPartitions partitions.
>
>
> So instead of using default I'll provide numPartitions , but what is the best 
> practice to calculate the number of partitions? and how number of partitions 
> related to my original problem?
>
>
> Thanks
>
> Oleg.
>
>
> http://spark.apache.org/docs/1.0.2/api/python/frames.html
>
>
>
> On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <eric.d.fried...@gmail.com> 
> wrote:
>>
>> Look at the API for text file and groupByKey. Please don't take threads off 
>> list. Other people have the same questions.
>>
>> ----
>> Eric Friedman
>>
>> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
>>
>> Can hou please explain how to configure partitions?
>> Thanks
>> Oleg
>>
>> On Wednesday, September 17, 2014, Eric Friedman <eric.d.fried...@gmail.com> 
>> wrote:
>>>
>>> Yeah, you need to increase partitions. You only have one on your text file. 
>>> On groupByKey you're getting the pyspark default, which is too low.
>>>
>>> ----
>>> Eric Friedman
>>>
>>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
>>>
>>> This is very good question :-).
>>>
>>> Here is my code:
>>>
>>> sc = SparkContext(appName="CAD")
>>>     lines = sc.textFile(sys.argv[1], 1)
>>>     result = lines.map(doSplit).groupByKey().mapValues(lambda vc: 
>>> my_custom_function(vc))
>>>     result.saveAsTextFile(sys.argv[2])
>>>
>>> Should I configure partitioning manually ? Where should I configure it? 
>>> Where can I read about partitioning best practices?
>>>
>>> Thanks
>>> Oleg.
>>>
>>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <eric.d.fried...@gmail.com> 
>>> wrote:
>>>>
>>>> How many partitions do you have in your input rdd?  Are you specifying 
>>>> numPartitions in subsequent calls to groupByKey/reduceByKey?
>>>>
>>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <oruchov...@gmail.com> wrote:
>>>>
>>>> Hi ,
>>>>   I am execution pyspark on yarn.
>>>> I have successfully executed initial dataset but now I growed it 10 times 
>>>> more.
>>>>
>>>> during execution I got all the time this error:
>>>>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost 
>>>> executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
>>>>
>>>>  tasks are failed a resubmitted again:
>>>>
>>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 
>>>> 29, 32, 33, 48, 75, 86, 91, 93, 94
>>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 
>>>> 93
>>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 
>>>> 27, 39, 51, 64
>>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
>>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 
>>>> 42, 61, 67, 77, 81, 91
>>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 
>>>> 29, 34, 40, 46, 67, 69, 86
>>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 
>>>> 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 
>>>> 85, 89
>>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 
>>>> 79, 92
>>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 
>>>> 24, 31, 43, 65, 73
>>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
>>>> at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 
>>>> 72, 75, 84
>>>>
>>>>
>>>>
>>>> QUESTION:
>>>>    how to debug / tune the problem.
>>>> What can cause to such behavior?
>>>> I have 5 machine cluster with 32 GB ram.
>>>>  Dataset - 3G.
>>>>
>>>> command for execution:
>>>>
>>>>      
>>>> /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit 
>>>> --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g 
>>>> --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py 
>>>>  /input/tad/inpuut.csv  /output/cad_model_500_2
>>>>
>>>>
>>>> Where can I find description of the parameters?
>>>> --num-executors 12
>>>> --driver-memory 4g
>>>> --executor-memory 2g
>>>>
>>>> What parameters should be used for tuning?
>>>>
>>>> Thanks
>>>> Oleg.
>>>>
>>>>
>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to