Ah, that makes perfect sense. Thanks for the concise explanation!
On Thu, Aug 7, 2014 at 9:14 PM, Xiangrui Meng <men...@gmail.com> wrote: > ratings.map{ case Rating(u,m,r) => { > val pred = model.predict(u, m) > (r - pred)*(r - pred) > } > }.mean() > > The code doesn't work because the userFeatures and productFeatures > stored in the model are RDDs. You tried to serialize them into the > task closure, and execute `model.predict` on an executor, which won't > work because `model.predict` can only be called on the driver. We > should make this clear in the doc. You should use what Burak > suggested: > > val predictions = model.predict(data.map(x => (x.user, x.product))) > > Best, > Xiangrui > > On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz <bya...@stanford.edu> wrote: > > Hi Jay, > > > > I've had the same problem you've been having in Question 1 with a > synthetic dataset. I thought I wasn't producing the dataset well enough. > This seems to > > be a bug. I will open a JIRA for it. > > > > Instead of using: > > > > ratings.map{ case Rating(u,m,r) => { > > val pred = model.predict(u, m) > > (r - pred)*(r - pred) > > } > > }.mean() > > > > you can use something like: > > > > val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, > x.product))) > > val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x => > > def mapPredictedRating(r: Double) = if (implicitPrefs) > math.max(math.min(r, 1.0), 0.0) else r > > ((x.user, x.product), mapPredictedRating(x.rating)) > > }.join(data.map(x => ((x.user, x.product), x.rating))).values > > > > math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - > x._2)).mean()) > > > > This work around worked for me. > > > > Regarding your question 2, it will be best of you do a special filtering > of the dataset so that you do train for that user and product. > > If we don't have any data trained on a user, there is no way to predict > how he would like a product. > > That filtering takes a lot of work though. I can share some code on that > too if you like. > > > > Best, > > Burak > > > > ----- Original Message ----- > > From: "Jay Hutfles" <jayhutf...@gmail.com> > > To: user@spark.apache.org > > Sent: Thursday, August 7, 2014 1:06:33 PM > > Subject: questions about MLLib recommendation models > > > > I have a few questions regarding a collaborative filtering model, and was > > hoping for some recommendations (no pun intended...) > > > > *Setup* > > > > I have a csv file with user/movie/ratings named unimaginatively > > 'movies.csv'. Here are the contents: > > > > 0,0,5 > > 0,1,5 > > 0,2,0 > > 0,3,0 > > 1,0,5 > > 1,3,0 > > 2,1,4 > > 2,2,0 > > 3,0,0 > > 3,1,0 > > 3,2,5 > > 3,3,4 > > 4,0,0 > > 4,1,0 > > 4,2,5 > > > > I then load it into an RDD with a nice command like > > > > val ratings = sc.textFile("movies.csv").map(_.split(',') match { case > > Array(u,m,r) => (Rating(u.toInt, m.toInt, r.toDouble))}) > > > > So far so good. I'm even okay building a model for predicting the absent > > values in the matrix with > > > > val rank = 10 > > val iters = 20 > > val model = ALS.train(ratings, rank, iters) > > > > I can then use the model to predict any user/movie rating without > trouble, > > like > > > > model.predict(2, 0) > > > > *Question 1: * > > > > If I were to calculate, say, the mean squared error of the training set > (or > > to my next question, a test set), this doesn't work: > > > > ratings.map{ case Rating(u,m,r) => { > > val pred = model.predict(u, m) > > (r - pred)*(r - pred) > > } > > }.mean() > > > > Actually, any action on RDDs created by mapping over the RDD[Rating] > with a > > model prediction fails, like > > > > ratings.map{ case Rating(u, m, _) => model.predict(u, m) }.collect > > > > I get errors due to a "scala.MatchError: null". Here's the exact > verbiage: > > > > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > > 26150.0:1 failed 1 times, most recent failure: Exception failure in TID > > 7091 on host localhost: scala.MatchError: null > > > > org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) > > > > > org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43) > > $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18) > > $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18) > > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > > > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > > > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > > scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:273) > > scala.collection.AbstractIterator.to(Iterator.scala:1157) > > > > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > > > > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > > scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > > > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) > > > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > > org.apache.spark.scheduler.Task.run(Task.scala:51) > > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:744) > > > > I think I'm missing something, since I can build up a scala collection of > > the exact (user, movie) tuples I'm testing, map over that with the model > > prediction, and it works fine. But if I map over the RDD[Rating], it > > doesn't. Am I doing something obviously wrong? > > > > *Question 2:* > > > > I have a much larger data set, and instead of running the ALS algorithm > on > > the whole set, it seems prudent to use the kFolds method in > > org.apache.spark.mllib.util.MLUtils to generate training/testing splits. > > > > It's rather sparse data, and there are cases where the test set has both > > users and movies that are not present in any Ratings in the training set. > > When encountering these, the model shouts at me: > > > > java.util.NoSuchElementException: next on empty iterator > > > > Is it the case that the Alternating Least Squares method doesn't create > > models which predict values for untrained users/products? My high-level > > understanding of the ALS implementation makes it seem understandable that > > the calculations depend on at least one rating for each user, and at > least > > one for each movie. Is that true? > > > > If so, should I simply filter out entries from the test set which have > > users or movies absent from the training set? Or is kMeans not an > > appropriate way to generate test data for collaborative filtering? > > > > Actually, I should have probably just asked, "What is the best way to do > > testing for recommendation models?" Leave it nice and general... > > > > Thanks in advance. Sorry for the long ramble. > > Jay > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > >