Hi, Today, I tried again with the following code, but it didn't work... Could you please tell me your running environment?
/from pyspark.mllib.recommendation import ALS from pyspark import SparkContext sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / I used spark-ec2 to create a 5 slaves cluster(I did some modifications on spark_ec.py, but the cluster is well launched and configured). And I found that the task failed when one slave node try to take the second task on count at ALS.scala:314 . I will take a look at the log and try to find the problem. Best Gen Davies Liu-2 wrote > I can run the following code against Spark 1.1 > > sc = SparkContext() > r1 = (1, 1, 1.0) > r2 = (1, 2, 2.0) > r3 = (2, 1, 2.0) > ratings = sc.parallelize([r1, r2, r3]) > model = ALS.trainImplicit(ratings, 1) > > Davies > > On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu < > davies@ > > wrote: >> Could you post the code that have problem with pyspark? thanks! >> >> Davies >> >> On Thu, Oct 16, 2014 at 12:27 PM, Gen < > gen.tang86@ > > wrote: >>> I tried the same data with scala. It works pretty well. >>> It seems that it is the problem of pyspark. >>> In the console, it shows the following logs: >>> >>> Traceback (most recent call last): >>> File " > <stdin> > ", line 1, in > <module> >>> * File "/root/spark/python/pyspark/mllib/recommendation.py", line 76, >>> in >>> trainImplicit >>> 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage >>> 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed >>> intentionally) >>> ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* >>> File >>> "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >>> line 538, in __call__ >>> File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", >>> line >>> 300, in get_return_value >>> py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN >>> scheduler.TaskSetManager: >>> Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): >>> TaskKilled (killed intentionally) >>> : An error occurred while calling o32.trainImplicitALSModel. >>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>> Task 6 >>> in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in >>> stage >>> 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): >>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: >>> scala.collection.mutable.HashSet >>> Serialization trace: >>> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>> >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) >>> >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>> >>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) >>> >>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>> >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> >>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) >>> >>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >>> >>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) >>> >>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>> >>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>> >>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>> >>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>> >>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>> >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >>> >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>> org.apache.spark.scheduler.Task.run(Task.scala:54) >>> >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> java.lang.Thread.run(Thread.java:745) >>> Driver stacktrace: >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>>>>> 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 18.2 in >>>>>> stage >>>>>> 975.0 (TID 1652, ip-172-31-35-241.ec2.internal): TaskKilled (killed >>>>>> intentionally) >>> 14/10/16 19:22:44 INFO scheduler.TaskSchedulerImpl: Removed TaskSet >>> 975.0, >>> whose tasks have all completed, from pool >>> >>> >>> >>> >>> Gen wrote >>>> Hi, >>>> >>>> I am trying to use ALS.trainImplicit method in the >>>> pyspark.mllib.recommendation. However it didn't work. So I tried use >>>> the >>>> example in the python API documentation such as: >>> / >>>> r1 = (1, 1, 1.0) >>>> r2 = (1, 2, 2.0) >>>> r3 = (2, 1, 2.0) >>>> ratings = sc.parallelize([r1, r2, r3]) >>>> model = ALS.trainImplicit(ratings, 1) >>> / >>>> >>>> It didn't work neither. After searching in google, I found that there >>>> are >>>> only two overloads for ALS.trainImplicit in the scala script. So I >>>> tried >>> / >>>> model = ALS.trainImplicit(ratings, 1, 1) >>> / >>>> , it worked. But if I set the iterations other than 1, >>> / >>>> model = ALS.trainImplicit(ratings, 1, 2) >>> / >>>> or >>> / >>>> model = ALS.trainImplicit(ratings, 4, 2) >>> / >>>> for example, it generated error. The information is as follows: >>>> >>>> count at ALS.scala:314 >>>> >>>> Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, >>>> most recent failure: Lost task 6.3 in stage 189.0 (TID 626, >>>> ip-172-31-35-239.ec2.internal): >>>> com.esotericsoftware.kryo.KryoException: >>>> java.lang.ArrayStoreException: scala.collection.mutable.HashSet >>>> Serialization trace: >>>> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) >>>> >>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >>>> >>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>> >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>> >>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) >>>> >>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) >>>> >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>> >>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) >>>> >>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>> >>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>> >>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) >>>> >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >>>> >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) >>>> >>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>> >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> >>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>> >>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>>> >>>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>>> >>>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>>> >>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>> >>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >>>> >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>> org.apache.spark.scheduler.Task.run(Task.scala:54) >>>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> java.lang.Thread.run(Thread.java:745) >>>> Driver stacktrace: >>>> >>>> It is really strange, because count at ALS.scala:314 is already out the >>>> loop of iterations. Any idea? >>>> Thanks a lot for advance. >>>> >>>> FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the >>>> cases. >>> >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16607.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: > user-unsubscribe@.apache >>> For additional commands, e-mail: > user-help@.apache >>> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: > user-unsubscribe@.apache > For additional commands, e-mail: > user-help@.apache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16681.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org