Hi, 

Today, I tried again with the following code, but it didn't work...
Could you please tell me your running environment?

/from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext

sc = SparkContext() 
r1 = (1, 1, 1.0) 
r2 = (1, 2, 2.0) 
r3 = (2, 1, 2.0) 
ratings = sc.parallelize([r1, r2, r3]) 
model = ALS.trainImplicit(ratings, 1) /

I used spark-ec2 to create a 5 slaves cluster(I did some modifications on
spark_ec.py, but the cluster is well launched and configured). 
And I found that the task failed when one slave node try to take the second
task on count at ALS.scala:314 . I will take a look at the log and try to
find the problem.

Best
Gen


Davies Liu-2 wrote
> I can run the following code against Spark 1.1
> 
> sc = SparkContext()
> r1 = (1, 1, 1.0)
> r2 = (1, 2, 2.0)
> r3 = (2, 1, 2.0)
> ratings = sc.parallelize([r1, r2, r3])
> model = ALS.trainImplicit(ratings, 1)
> 
> Davies
> 
> On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu <

> davies@

> > wrote:
>> Could you post the code that have problem with pyspark? thanks!
>>
>> Davies
>>
>> On Thu, Oct 16, 2014 at 12:27 PM, Gen <

> gen.tang86@

> > wrote:
>>> I tried the same data with scala. It works pretty well.
>>> It seems that it is the problem of pyspark.
>>> In the console, it shows the following logs:
>>>
>>> Traceback (most recent call last):
>>>   File "
> <stdin>
> ", line 1, in 
> <module>
>>> *  File "/root/spark/python/pyspark/mllib/recommendation.py", line 76,
>>> in
>>> trainImplicit
>>> 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage
>>> 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed
>>> intentionally)
>>>     ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)*
>>>   File
>>> "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>>> line 538, in __call__
>>>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>> line
>>> 300, in get_return_value
>>> py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN
>>> scheduler.TaskSetManager:
>>> Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal):
>>> TaskKilled (killed intentionally)
>>> : An error occurred while calling o32.trainImplicitALSModel.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 6
>>> in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in
>>> stage
>>> 975.0 (TID 1651, ip-172-31-35-237.ec2.internal):
>>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException:
>>> scala.collection.mutable.HashSet
>>> Serialization trace:
>>> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>        
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>>>        
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>>>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
>>>
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>        
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>>>
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>>
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>>>
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>        
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>        
>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>
>>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>
>>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>        
>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>        
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>>>        
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         java.lang.Thread.run(Thread.java:745)
>>> Driver stacktrace:
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>>>         at scala.Option.foreach(Option.scala:236)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>         at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>>>> 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 18.2 in
>>>>>> stage
>>>>>> 975.0 (TID 1652, ip-172-31-35-241.ec2.internal): TaskKilled (killed
>>>>>> intentionally)
>>> 14/10/16 19:22:44 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
>>> 975.0,
>>> whose tasks have all completed, from pool
>>>
>>>
>>>
>>>
>>> Gen wrote
>>>> Hi,
>>>>
>>>> I am trying to use ALS.trainImplicit method in the
>>>> pyspark.mllib.recommendation. However it didn't work. So I tried use
>>>> the
>>>> example in the python API documentation such as:
>>> /
>>>> r1 = (1, 1, 1.0)
>>>> r2 = (1, 2, 2.0)
>>>> r3 = (2, 1, 2.0)
>>>> ratings = sc.parallelize([r1, r2, r3])
>>>> model = ALS.trainImplicit(ratings, 1)
>>> /
>>>>
>>>> It didn't work neither. After searching in google, I found that there
>>>> are
>>>> only two overloads for ALS.trainImplicit in the scala script. So I
>>>> tried
>>> /
>>>> model = ALS.trainImplicit(ratings, 1, 1)
>>> /
>>>> , it worked. But if I set the iterations other than 1,
>>> /
>>>> model = ALS.trainImplicit(ratings, 1, 2)
>>> /
>>>>  or
>>> /
>>>> model = ALS.trainImplicit(ratings, 4, 2)
>>> /
>>>>  for example, it generated error. The information is as follows:
>>>>
>>>> count at ALS.scala:314
>>>>
>>>> Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times,
>>>> most recent failure: Lost task 6.3 in stage 189.0 (TID 626,
>>>> ip-172-31-35-239.ec2.internal):
>>>> com.esotericsoftware.kryo.KryoException:
>>>> java.lang.ArrayStoreException: scala.collection.mutable.HashSet
>>>> Serialization trace:
>>>> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>        
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>        
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>>>>        
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>>>>        
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>
>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
>>>>
>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>        
>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>>>>
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>>>
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>>>>
>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>        
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>
>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>        
>>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>
>>>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>
>>>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>        
>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>>        
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>>>>        
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         java.lang.Thread.run(Thread.java:745)
>>>> Driver stacktrace:
>>>>
>>>> It is really strange, because count at ALS.scala:314 is already out the
>>>> loop of iterations. Any idea?
>>>> Thanks a lot for advance.
>>>>
>>>> FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the
>>>> cases.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16607.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: 

> user-unsubscribe@.apache

>>> For additional commands, e-mail: 

> user-help@.apache

>>>
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: 

> user-unsubscribe@.apache

> For additional commands, e-mail: 

> user-help@.apache





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to