It seems that the error happens before ALS iterations. Could you try `ratings.first()` right after `ratings = newrdd.map(lambda l: Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50)`? -Xiangrui
On Fri, Jun 26, 2015 at 2:28 PM, Ayman Farahat <ayman.fara...@yahoo.com> wrote: > I tried something similar and got oration error > I had 10 executors and 10 8 cores > > >>>> ratings = newrdd.map(lambda l: >>>> Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50) >>>> mypart = ratings.getNumPartitions() >>>> mypart > 50 >>>> numIterations =10 >>>> rank = 100 >>>> model = ALS.trainImplicit(ratings, rank, numIterations) > [Stage 4:> (0 + 59) / 210][Stage 6:> (0 + 21) / > 210] > [Stage 4:===========> (169 + 41) / 210][Stage 6:=> (24 + 39) / > 210] > [Stage 6:=============================================> (178 + 32) / > 210] > [Stage 7:> (0 + 80) / > 200]15/06/26 21:25:11 ERROR TaskSetManager: Task 35 in stage 7.0 failed 4 > times; aborting job > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File > "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", > line 200, in trainImplicit > model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), > rank, > File > "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", > line 181, in _prepare > first = ratings.first() > File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line > 1283, in first > rs = self.take(1) > File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line > 1265, in take > res = self.context.runJob(self, takeUpToNumLeft, p, True) > File "/homes/afarahat/aofspark/share/spark/python/pyspark/context.py", > line 897, in runJob > allowLocal) > File > "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 > in stage 7.0 failed 4 times, most recent failure: Lost task 35.3 in stage > 7.0 (TID 1213, gsbl407n02.blue.ygrid.yahoo.com): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py", > line 111, in main > process() > File > "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py", > line 106, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/serializers.py", > line 133, in dump_stream > for obj in iterator: > File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line > 1669, in add_shuffle_key > ValueError: too many values to unpack > > at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) > at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > On Jun 26, 2015, at 12:43 PM, Ravi Mody <rmody...@gmail.com> wrote: > > I set the number of partitions on the input dataset at 50. The number of CPU > cores I'm using is 84 (7 executors, 12 cores). > > I'll look into getting a full stack trace. Any idea what my errors mean, and > why increasing memory causes them to go away? Thanks. > > On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng <men...@gmail.com> wrote: >> >> Please see my comments inline. It would be helpful if you can attach >> the full stack trace. -Xiangrui >> >> On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody <rmody...@gmail.com> wrote: >> > 1. These are my settings: >> > rank = 100 >> > iterations = 12 >> > users = ~20M >> > items = ~2M >> > training examples = ~500M-1B (I'm running into the issue even with 500M >> > training examples) >> > >> >> Did you set number of blocks? If you didn't, could you check how many >> partitions you have in the ratings RDD? Setting a large number of >> blocks would increase shuffle size. If you have enough RAM, try to set >> number of blocks to the number of CPU cores or less. >> >> > 2. The memory storage never seems to go too high. The user blocks may go >> > up >> > to ~10Gb, and each executor will have a few GB used out of 30 free GB. >> > Everything seems small compared to the amount of memory I'm using. >> > >> >> This looks correct. >> >> > 3. I think I have a lot of disk space - is this on the executors or the >> > driver? Is there a way to know if the error is coming from disk space. >> > >> >> You can see the shuffle data size for each iteration from the WebUI. >> Usually, it should throw an out of disk space exception instead of the >> message you posted. But it is worth checking. >> >> > 4. I'm not changing checkpointing settings, but I think checkpointing >> > defaults to every 10 iterations? One notable thing is the crashes often >> > start on or after the 9th iteration, so it may be related to >> > checkpointing. >> > But this could just be a coincidence. >> > >> >> If you didn't set checkpointDir in SparkContext, the >> checkpointInterval setting in ALS has no effect. >> >> > Thanks! >> > >> > >> > >> > >> > >> > On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat <ayman.fara...@yahoo.com> >> > wrote: >> >> >> >> was there any resolution to that problem? >> >> I am also having that with Pyspark 1.4 >> >> 380 Million observations >> >> 100 factors and 5 iterations >> >> Thanks >> >> Ayman >> >> >> >> On Jun 23, 2015, at 6:20 PM, Xiangrui Meng <men...@gmail.com> wrote: >> >> >> >> > It shouldn't be hard to handle 1 billion ratings in 1.3. Just need >> >> > more information to guess what happened: >> >> > >> >> > 1. Could you share the ALS settings, e.g., number of blocks, rank and >> >> > number of iterations, as well as number of users/items in your >> >> > dataset? >> >> > 2. If you monitor the progress in the WebUI, how much data is stored >> >> > in memory and how much data is shuffled per iteration? >> >> > 3. Do you have enough disk space for the shuffle files? >> >> > 4. Did you set checkpointDir in SparkContext and checkpointInterval >> >> > in >> >> > ALS? >> >> > >> >> > Best, >> >> > Xiangrui >> >> > >> >> > On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody <rmody...@gmail.com> >> >> > wrote: >> >> >> Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on >> >> >> fairly >> >> >> large datasets (1+ billion input records). As I grow my dataset I >> >> >> often >> >> >> run >> >> >> into issues with a lot of failed stages and dropped executors, >> >> >> ultimately >> >> >> leading to the whole application failing. The errors are like >> >> >> "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an >> >> >> output >> >> >> location for shuffle 19" and >> >> >> "org.apache.spark.shuffle.FetchFailedException: >> >> >> Failed to connect to...". These occur during flatMap, mapPartitions, >> >> >> and >> >> >> aggregate stages. I know that increasing memory fixes this issue, >> >> >> but >> >> >> most >> >> >> of the time my executors are only using a tiny portion of the their >> >> >> allocated memory (<10%). Often, the stages run fine until the last >> >> >> iteration >> >> >> or two of ALS, but this could just be a coincidence. >> >> >> >> >> >> I've tried tweaking a lot of settings, but it's time-consuming to do >> >> >> this >> >> >> through guess-and-check. Right now I have these set: >> >> >> spark.shuffle.memoryFraction = 0.3 >> >> >> spark.storage.memoryFraction = 0.65 >> >> >> spark.executor.heartbeatInterval = 600000 >> >> >> >> >> >> I'm sure these settings aren't optimal - any idea of what could be >> >> >> causing >> >> >> my errors, and what direction I can push these settings in to get >> >> >> more >> >> >> out >> >> >> of my memory? I'm currently using 240 GB of memory (on 7 executors) >> >> >> for >> >> >> a 1 >> >> >> billion record dataset, which seems like too much. Thanks! >> >> > >> >> > --------------------------------------------------------------------- >> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> > For additional commands, e-mail: user-h...@spark.apache.org >> >> > >> >> >> > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org