Hi, everyone, 

According to Xiangrui Meng(I think that he is the author of ALS), this
problem is caused by Kryo serialization:

/In PySpark 1.1, we switched to Kryo serialization by default. However, ALS
code requires special registration with Kryo in order to work. The error
happens when there is not enough memory and ALS needs to store ratings or
in/out blocks to disk. I will work on this issue./

And he provide the workaround for this problem: 

{code}
bin/pyspark --master local-cluster[2,1,512] --conf
'spark.kryo.registrator=org.apache.spark.examples.mllib.MovieLensALS$ALSRegistrator'
--jars lib/spark-examples-1.1.0-hadoop2.4.0.jar
{code}
Notice that you should replace master and the filename of the example jar to
match yours.

I created a topics in JIRA  https://issues.apache.org/jira/browse/SPARK-3990
<https://issues.apache.org/jira/browse/SPARK-3990>  . You can go to there
for more information or make a contribution to fix this problem.

Cheers
Gen




Gen wrote
> Hi,
> 
> I am trying to use ALS.trainImplicit method in the
> pyspark.mllib.recommendation. However it didn't work. So I tried use the
> example in the python API documentation such as:
/
> r1 = (1, 1, 1.0) 
> r2 = (1, 2, 2.0) 
> r3 = (2, 1, 2.0) 
> ratings = sc.parallelize([r1, r2, r3]) 
> model = ALS.trainImplicit(ratings, 1) 
/
> 
> It didn't work neither. After searching in google, I found that there are
> only two overloads for ALS.trainImplicit in the scala script. So I tried 
/
> model = ALS.trainImplicit(ratings, 1, 1)
/
> , it worked. But if I set the iterations other than 1,  
/
> model = ALS.trainImplicit(ratings, 1, 2)
/
>  or 
/
> model = ALS.trainImplicit(ratings, 4, 2)
/
>  for example, it generated error. The information is as follows:
> 
> count at ALS.scala:314
> 
> Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times,
> most recent failure: Lost task 6.3 in stage 189.0 (TID 626,
> ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException:
> java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>        
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>        
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>        
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
>        
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>        
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>        
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>        
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>        
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>        
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>        
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>        
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>        
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>        
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>        
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>        
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>        
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> 
> It is really strange, because count at ALS.scala:314 is already out the
> loop of iterations. Any idea?
> Thanks a lot for advance.
> 
> FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the
> cases.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to