Re: GraphX: ShortestPaths does not terminate on a grid graph
I think this is a separate issue with how the EdgeRDDImpl partitions edges. If you can merge this change in and rebuild, it should work: https://github.com/apache/spark/pull/4136/files If you can't, I just called the Graph.partitonBy() method right after construction my graph but before performing any operations on it. That way, the EdgeRDDImpl class doesn't have to use the default partitioner. Hope this helps! Jay On Tue Feb 03 2015 at 12:35:14 AM NicolasC nicolas.ch...@inria.fr wrote: On 01/29/2015 08:31 PM, Ankur Dave wrote: Thanks for the reminder. I just created a PR: https://github.com/apache/spark/pull/4273 Ankur Hello, Thanks for the patch. I applied it on Pregel.scala (in Spark 1.2.0 sources) and rebuilt Spark. During execution, at the 25th iteration of Pregel, checkpointing is done and then it throws the following exception : Exception in thread main org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[521] at reduce at VertexRDDImpl.scala:80(0) has different number of partitions than original RDD VertexRDD ZippedPartitionsRDD2[518] at zipPartitions at VertexRDDImpl.scala:170(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint( RDDCheckpointData.scala:98) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1279) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply( RDD.scala:1281) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply( RDD.scala:1281) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1281) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1351) at org.apache.spark.rdd.RDD.reduce(RDD.scala:867) at org.apache.spark.graphx.impl.VertexRDDImpl.count( VertexRDDImpl.scala:80) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:155) at org.apache.spark.graphx.lib.ShortestPaths$.run( ShortestPaths.scala:69) Pregel.scala:155 is the following line in the pregel loop: activeMessages = messages.count() - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX: ShortestPaths does not terminate on a grid graph
Just curious, is this set to be merged at some point? On Thu Jan 22 2015 at 4:34:46 PM Ankur Dave ankurd...@gmail.com wrote: At 2015-01-22 02:06:37 -0800, NicolasC nicolas.ch...@inria.fr wrote: I try to execute a simple program that runs the ShortestPaths algorithm (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph. I use Spark 1.2.0 downloaded from spark.apache.org. This program runs more than 2 hours when the grid size is 70x70 as above, and is then killed by the resource manager of the cluster (Torque). After a 5-6 minutes of execution, the Spark master UI does not even respond. For a grid size of 30x30, the program terminates in about 20 seconds, and for a grid size of 50x50 it finishes in about 80 seconds. The problem appears for a grid size of 70x70 and above. Unfortunately this problem is due to a Spark bug. In later iterations of iterative algorithms, the lineage maintained for fault tolerance grows long and causes Spark to consume increasing amounts of resources for scheduling and task serialization. The workaround is to checkpoint the graph periodically, which writes it to stable storage and interrupts the lineage chain before it grows too long. If you're able to recompile Spark, you can do this by applying the patch to GraphX at the end of this mail, and before running graph algorithms, calling sc.setCheckpointDir(/tmp) to set the checkpoint directory as desired. Ankur === patch begins here === diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620..1fbbb87 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -126,6 +126,8 @@ object Pregel extends Logging { // Loop var prevG: Graph[VD, ED] = null var i = 0 +val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty +val checkpointFrequency = 25 while (activeMessages 0 i maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() @@ -139,6 +141,14 @@ object Pregel extends Logging { // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() + + if (checkpoint i % checkpointFrequency == checkpointFrequency - 1) { +logInfo(Checkpointing in iteration + i) +g.vertices.checkpoint() +g.edges.checkpoint() +messages.checkpoint() + } + // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-shell bug with RDD distinct?
Found a problem in the spark-shell, but can't confirm that it's related to open issues on Spark's JIRA page. I was wondering if anyone could help identify if this is an issue or if it's already being addressed. Test: (in spark-shell) case class Person(name: String, age: Int) val peopleList = List(Person(Alice, 35), Person(Bob, 47), Person(Alice, 35), Person(Bob, 15)) val peopleRDD = sc.parallelize(peopleList) assert(peopleList.distinct.size == peopleRDD.distinct.count) At first I thought it was related to issue SPARK-2620 ( https://issues.apache.org/jira/browse/SPARK-2620), which says case classes can't be used as keys in spark-shell due to how case classes are compiled by the REPL. It lists .reduceByKey, .groupByKey and .distinct as being affected. But the associated pull request for adding tests to cover this ( https://github.com/apache/spark/pull/1588) was closed. Is this something I just have to live with when using the REPL? Or is this covered by something bigger that's being addressed? Thanks in advance -Jay
spark-shell bug with RDDs and case classes?
Found a problem in the spark-shell, but can't confirm that it's related to open issues on Spark's JIRA page. I was wondering if anyone could help identify if this is an issue or if it's already being addressed. Test: (in spark-shell) case class Person(name: String, age: Int) val peopleList = List(Person(Alice, 35), Person(Bob, 47), Person(Alice, 35), Person(Bob, 15)) val peopleRDD = sc.parallelize(peopleList) assert(peopleList.distinct.size == peopleRDD.distinct.count) At first I thought it was related to issue SPARK-2620 (https://issues.apache.org/jira/browse/SPARK-2620), which says case classes can't be used as keys in spark-shell due to how case classes are compiled by the REPL. It lists .reduceByKey, .groupByKey and .distinct as being affected. But the associated pull request for adding tests to cover this (https://github.com/apache/spark/pull/1588) was closed. Is this something I just have to live with when using the REPL? Or is this covered by something bigger that's being addressed? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX Pregel halting condition
I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance!
GraphX Pregel halting condition
I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance!
Re: questions about MLLib recommendation models
Ah, that makes perfect sense. Thanks for the concise explanation! On Thu, Aug 7, 2014 at 9:14 PM, Xiangrui Meng men...@gmail.com wrote: ratings.map{ case Rating(u,m,r) = { val pred = model.predict(u, m) (r - pred)*(r - pred) } }.mean() The code doesn't work because the userFeatures and productFeatures stored in the model are RDDs. You tried to serialize them into the task closure, and execute `model.predict` on an executor, which won't work because `model.predict` can only be called on the driver. We should make this clear in the doc. You should use what Burak suggested: val predictions = model.predict(data.map(x = (x.user, x.product))) Best, Xiangrui On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz bya...@stanford.edu wrote: Hi Jay, I've had the same problem you've been having in Question 1 with a synthetic dataset. I thought I wasn't producing the dataset well enough. This seems to be a bug. I will open a JIRA for it. Instead of using: ratings.map{ case Rating(u,m,r) = { val pred = model.predict(u, m) (r - pred)*(r - pred) } }.mean() you can use something like: val predictions: RDD[Rating] = model.predict(data.map(x = (x.user, x.product))) val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x = def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r ((x.user, x.product), mapPredictedRating(x.rating)) }.join(data.map(x = ((x.user, x.product), x.rating))).values math.sqrt(predictionsAndRatings.map(x = (x._1 - x._2) * (x._1 - x._2)).mean()) This work around worked for me. Regarding your question 2, it will be best of you do a special filtering of the dataset so that you do train for that user and product. If we don't have any data trained on a user, there is no way to predict how he would like a product. That filtering takes a lot of work though. I can share some code on that too if you like. Best, Burak - Original Message - From: Jay Hutfles jayhutf...@gmail.com To: user@spark.apache.org Sent: Thursday, August 7, 2014 1:06:33 PM Subject: questions about MLLib recommendation models I have a few questions regarding a collaborative filtering model, and was hoping for some recommendations (no pun intended...) *Setup* I have a csv file with user/movie/ratings named unimaginatively 'movies.csv'. Here are the contents: 0,0,5 0,1,5 0,2,0 0,3,0 1,0,5 1,3,0 2,1,4 2,2,0 3,0,0 3,1,0 3,2,5 3,3,4 4,0,0 4,1,0 4,2,5 I then load it into an RDD with a nice command like val ratings = sc.textFile(movies.csv).map(_.split(',') match { case Array(u,m,r) = (Rating(u.toInt, m.toInt, r.toDouble))}) So far so good. I'm even okay building a model for predicting the absent values in the matrix with val rank = 10 val iters = 20 val model = ALS.train(ratings, rank, iters) I can then use the model to predict any user/movie rating without trouble, like model.predict(2, 0) *Question 1: * If I were to calculate, say, the mean squared error of the training set (or to my next question, a test set), this doesn't work: ratings.map{ case Rating(u,m,r) = { val pred = model.predict(u, m) (r - pred)*(r - pred) } }.mean() Actually, any action on RDDs created by mapping over the RDD[Rating] with a model prediction fails, like ratings.map{ case Rating(u, m, _) = model.predict(u, m) }.collect I get errors due to a scala.MatchError: null. Here's the exact verbiage: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26150.0:1 failed 1 times, most recent failure: Exception failure in TID 7091 on host localhost: scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray
questions about MLLib recommendation models
I have a few questions regarding a collaborative filtering model, and was hoping for some recommendations (no pun intended...) *Setup* I have a csv file with user/movie/ratings named unimaginatively 'movies.csv'. Here are the contents: 0,0,5 0,1,5 0,2,0 0,3,0 1,0,5 1,3,0 2,1,4 2,2,0 3,0,0 3,1,0 3,2,5 3,3,4 4,0,0 4,1,0 4,2,5 I then load it into an RDD with a nice command like val ratings = sc.textFile(movies.csv).map(_.split(',') match { case Array(u,m,r) = (Rating(u.toInt, m.toInt, r.toDouble))}) So far so good. I'm even okay building a model for predicting the absent values in the matrix with val rank = 10 val iters = 20 val model = ALS.train(ratings, rank, iters) I can then use the model to predict any user/movie rating without trouble, like model.predict(2, 0) *Question 1: * If I were to calculate, say, the mean squared error of the training set (or to my next question, a test set), this doesn't work: ratings.map{ case Rating(u,m,r) = { val pred = model.predict(u, m) (r - pred)*(r - pred) } }.mean() Actually, any action on RDDs created by mapping over the RDD[Rating] with a model prediction fails, like ratings.map{ case Rating(u, m, _) = model.predict(u, m) }.collect I get errors due to a scala.MatchError: null. Here's the exact verbiage: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26150.0:1 failed 1 times, most recent failure: Exception failure in TID 7091 on host localhost: scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:18) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I think I'm missing something, since I can build up a scala collection of the exact (user, movie) tuples I'm testing, map over that with the model prediction, and it works fine. But if I map over the RDD[Rating], it doesn't. Am I doing something obviously wrong? *Question 2:* I have a much larger data set, and instead of running the ALS algorithm on the whole set, it seems prudent to use the kFolds method in org.apache.spark.mllib.util.MLUtils to generate training/testing splits. It's rather sparse data, and there are cases where the test set has both users and movies that are not present in any Ratings in the training set. When encountering these, the model shouts at me: java.util.NoSuchElementException: next on empty iterator Is it the case that the Alternating Least Squares method doesn't create models which predict values for untrained users/products? My high-level understanding of the ALS implementation makes it seem understandable that the calculations depend on at least one rating for each user, and at least one for each movie. Is that true? If so, should I simply filter out entries from the test set which have users or movies absent from the training set? Or is kMeans not an appropriate way to generate test data for collaborative filtering? Actually, I should have probably just asked, What is the best way to do testing for recommendation models? Leave it nice and general... Thanks in advance. Sorry for the long ramble. Jay