ratings.map{ case Rating(u,m,r) => { val pred = model.predict(u, m) (r - pred)*(r - pred) } }.mean()
The code doesn't work because the userFeatures and productFeatures stored in the model are RDDs. You tried to serialize them into the task closure, and execute `model.predict` on an executor, which won't work because `model.predict` can only be called on the driver. We should make this clear in the doc. You should use what Burak suggested: val predictions = model.predict(data.map(x => (x.user, x.product))) Best, Xiangrui On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz <bya...@stanford.edu> wrote: > Hi Jay, > > I've had the same problem you've been having in Question 1 with a synthetic > dataset. I thought I wasn't producing the dataset well enough. This seems to > be a bug. I will open a JIRA for it. > > Instead of using: > > ratings.map{ case Rating(u,m,r) => { > val pred = model.predict(u, m) > (r - pred)*(r - pred) > } > }.mean() > > you can use something like: > > val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, > x.product))) > val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x => > def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, > 1.0), 0.0) else r > ((x.user, x.product), mapPredictedRating(x.rating)) > }.join(data.map(x => ((x.user, x.product), x.rating))).values > > math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - > x._2)).mean()) > > This work around worked for me. > > Regarding your question 2, it will be best of you do a special filtering of > the dataset so that you do train for that user and product. > If we don't have any data trained on a user, there is no way to predict how > he would like a product. > That filtering takes a lot of work though. I can share some code on that too > if you like. > > Best, > Burak > > ----- Original Message ----- > From: "Jay Hutfles" <jayhutf...@gmail.com> > To: user@spark.apache.org > Sent: Thursday, August 7, 2014 1:06:33 PM > Subject: questions about MLLib recommendation models > > I have a few questions regarding a collaborative filtering model, and was > hoping for some recommendations (no pun intended...) > > *Setup* > > I have a csv file with user/movie/ratings named unimaginatively > 'movies.csv'. Here are the contents: > > 0,0,5 > 0,1,5 > 0,2,0 > 0,3,0 > 1,0,5 > 1,3,0 > 2,1,4 > 2,2,0 > 3,0,0 > 3,1,0 > 3,2,5 > 3,3,4 > 4,0,0 > 4,1,0 > 4,2,5 > > I then load it into an RDD with a nice command like > > val ratings = sc.textFile("movies.csv").map(_.split(',') match { case > Array(u,m,r) => (Rating(u.toInt, m.toInt, r.toDouble))}) > > So far so good. I'm even okay building a model for predicting the absent > values in the matrix with > > val rank = 10 > val iters = 20 > val model = ALS.train(ratings, rank, iters) > > I can then use the model to predict any user/movie rating without trouble, > like > > model.predict(2, 0) > > *Question 1: * > > If I were to calculate, say, the mean squared error of the training set (or > to my next question, a test set), this doesn't work: > > ratings.map{ case Rating(u,m,r) => { > val pred = model.predict(u, m) > (r - pred)*(r - pred) > } > }.mean() > > Actually, any action on RDDs created by mapping over the RDD[Rating] with a > model prediction fails, like > > ratings.map{ case Rating(u, m, _) => model.predict(u, m) }.collect > > I get errors due to a "scala.MatchError: null". Here's the exact verbiage: > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 26150.0:1 failed 1 times, most recent failure: Exception failure in TID > 7091 on host localhost: scala.MatchError: null > > org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) > > org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43) > $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18) > $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > scala.collection.Iterator$class.foreach(Iterator.scala:727) > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > scala.collection.AbstractIterator.to(Iterator.scala:1157) > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > org.apache.spark.scheduler.Task.run(Task.scala:51) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > > I think I'm missing something, since I can build up a scala collection of > the exact (user, movie) tuples I'm testing, map over that with the model > prediction, and it works fine. But if I map over the RDD[Rating], it > doesn't. Am I doing something obviously wrong? > > *Question 2:* > > I have a much larger data set, and instead of running the ALS algorithm on > the whole set, it seems prudent to use the kFolds method in > org.apache.spark.mllib.util.MLUtils to generate training/testing splits. > > It's rather sparse data, and there are cases where the test set has both > users and movies that are not present in any Ratings in the training set. > When encountering these, the model shouts at me: > > java.util.NoSuchElementException: next on empty iterator > > Is it the case that the Alternating Least Squares method doesn't create > models which predict values for untrained users/products? My high-level > understanding of the ALS implementation makes it seem understandable that > the calculations depend on at least one rating for each user, and at least > one for each movie. Is that true? > > If so, should I simply filter out entries from the test set which have > users or movies absent from the training set? Or is kMeans not an > appropriate way to generate test data for collaborative filtering? > > Actually, I should have probably just asked, "What is the best way to do > testing for recommendation models?" Leave it nice and general... > > Thanks in advance. Sorry for the long ramble. > Jay > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org