Hi, everyone, According to Xiangrui Meng(I think that he is the author of ALS), this problem is caused by Kryo serialization:
/In PySpark 1.1, we switched to Kryo serialization by default. However, ALS code requires special registration with Kryo in order to work. The error happens when there is not enough memory and ALS needs to store ratings or in/out blocks to disk. I will work on this issue./ And he provide the workaround for this problem: {code} bin/pyspark --master local-cluster[2,1,512] --conf 'spark.kryo.registrator=org.apache.spark.examples.mllib.MovieLensALS$ALSRegistrator' --jars lib/spark-examples-1.1.0-hadoop2.4.0.jar {code} Notice that you should replace master and the filename of the example jar to match yours. I created a topics in JIRA https://issues.apache.org/jira/browse/SPARK-3990 <https://issues.apache.org/jira/browse/SPARK-3990> . You can go to there for more information or make a contribution to fix this problem. Cheers Gen Gen wrote > Hi, > > I am trying to use ALS.trainImplicit method in the > pyspark.mllib.recommendation. However it didn't work. So I tried use the > example in the python API documentation such as: / > r1 = (1, 1, 1.0) > r2 = (1, 2, 2.0) > r3 = (2, 1, 2.0) > ratings = sc.parallelize([r1, r2, r3]) > model = ALS.trainImplicit(ratings, 1) / > > It didn't work neither. After searching in google, I found that there are > only two overloads for ALS.trainImplicit in the scala script. So I tried / > model = ALS.trainImplicit(ratings, 1, 1) / > , it worked. But if I set the iterations other than 1, / > model = ALS.trainImplicit(ratings, 1, 2) / > or / > model = ALS.trainImplicit(ratings, 4, 2) / > for example, it generated error. The information is as follows: > > count at ALS.scala:314 > > Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, > most recent failure: Lost task 6.3 in stage 189.0 (TID 626, > ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: > java.lang.ArrayStoreException: scala.collection.mutable.HashSet > Serialization trace: > shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) > > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > > It is really strange, because count at ALS.scala:314 is already out the > loop of iterations. Any idea? > Thanks a lot for advance. > > FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the > cases. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org