Great. Upgrade helped. Still need some inputs: 1) Is there any log files of spark job execution? 2) Where can I read about tuning / parameter configuration:
For example: --num-executors 12 --driver-memory 4g --executor-memory 2g what is the meaning of thous parameters? Thanks Oleg. On Thu, Sep 18, 2014 at 12:15 AM, Davies Liu <dav...@databricks.com> wrote: > Maybe the Python worker use too much memory during groupByKey(), > groupByKey() with larger numPartitions can help. > > Also, can you upgrade your cluster to 1.1? It can spilling the data > into disks if the memory can not hold all the data during groupByKey(). > > Also, If there is hot key with dozens of millions of values, the PR [1] > can help it, it actually helped someone with large datasets (3T). > > Davies > > [1] https://github.com/apache/spark/pull/1977 > > On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > > > > Sure, I'll post to the mail list. > > > > groupByKey(self, numPartitions=None) > > > > source code > > > > Group the values for each key in the RDD into a single sequence. > Hash-partitions the resulting RDD with into numPartitions partitions. > > > > > > So instead of using default I'll provide numPartitions , but what is the > best practice to calculate the number of partitions? and how number of > partitions related to my original problem? > > > > > > Thanks > > > > Oleg. > > > > > > http://spark.apache.org/docs/1.0.2/api/python/frames.html > > > > > > > > On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman < > eric.d.fried...@gmail.com> wrote: > >> > >> Look at the API for text file and groupByKey. Please don't take threads > off list. Other people have the same questions. > >> > >> ---- > >> Eric Friedman > >> > >> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > >> > >> Can hou please explain how to configure partitions? > >> Thanks > >> Oleg > >> > >> On Wednesday, September 17, 2014, Eric Friedman < > eric.d.fried...@gmail.com> wrote: > >>> > >>> Yeah, you need to increase partitions. You only have one on your text > file. On groupByKey you're getting the pyspark default, which is too low. > >>> > >>> ---- > >>> Eric Friedman > >>> > >>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > >>> > >>> This is very good question :-). > >>> > >>> Here is my code: > >>> > >>> sc = SparkContext(appName="CAD") > >>> lines = sc.textFile(sys.argv[1], 1) > >>> result = lines.map(doSplit).groupByKey().mapValues(lambda vc: > my_custom_function(vc)) > >>> result.saveAsTextFile(sys.argv[2]) > >>> > >>> Should I configure partitioning manually ? Where should I configure > it? Where can I read about partitioning best practices? > >>> > >>> Thanks > >>> Oleg. > >>> > >>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman < > eric.d.fried...@gmail.com> wrote: > >>>> > >>>> How many partitions do you have in your input rdd? Are you > specifying numPartitions in subsequent calls to groupByKey/reduceByKey? > >>>> > >>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > >>>> > >>>> Hi , > >>>> I am execution pyspark on yarn. > >>>> I have successfully executed initial dataset but now I growed it 10 > times more. > >>>> > >>>> during execution I got all the time this error: > >>>> 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost > executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated > >>>> > >>>> tasks are failed a resubmitted again: > >>>> > >>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, > 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 > >>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, > 60, 93 > >>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, > 23, 27, 39, 51, 64 > >>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, > 80 > >>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, > 34, 42, 61, 67, 77, 81, 91 > >>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, > 23, 29, 34, 40, 46, 67, 69, 86 > >>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, > 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, > 85, 89 > >>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, > 59, 79, 92 > >>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, > 11, 24, 31, 43, 65, 73 > >>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 > (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, > 41, 72, 75, 84 > >>>> > >>>> > >>>> > >>>> QUESTION: > >>>> how to debug / tune the problem. > >>>> What can cause to such behavior? > >>>> I have 5 machine cluster with 32 GB ram. > >>>> Dataset - 3G. > >>>> > >>>> command for execution: > >>>> > >>>> > /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit > --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g > --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py > /input/tad/inpuut.csv /output/cad_model_500_2 > >>>> > >>>> > >>>> Where can I find description of the parameters? > >>>> --num-executors 12 > >>>> --driver-memory 4g > >>>> --executor-memory 2g > >>>> > >>>> What parameters should be used for tuning? > >>>> > >>>> Thanks > >>>> Oleg. > >>>> > >>>> > >>>> > >>> > > >