Ah, that makes perfect sense.  Thanks for the concise explanation!

On Thu, Aug 7, 2014 at 9:14 PM, Xiangrui Meng <men...@gmail.com> wrote:

> ratings.map{ case Rating(u,m,r) => {
>     val pred = model.predict(u, m)
>     (r - pred)*(r - pred)
>   }
> }.mean()
>
> The code doesn't work because the userFeatures and productFeatures
> stored in the model are RDDs. You tried to serialize them into the
> task closure, and execute `model.predict` on an executor, which won't
> work because `model.predict` can only be called on the driver. We
> should make this clear in the doc. You should use what Burak
> suggested:
>
> val predictions = model.predict(data.map(x => (x.user, x.product)))
>
> Best,
> Xiangrui
>
> On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz <bya...@stanford.edu> wrote:
> > Hi Jay,
> >
> > I've had the same problem you've been having in Question 1 with a
> synthetic dataset. I thought I wasn't producing the dataset well enough.
> This seems to
> > be a bug. I will open a JIRA for it.
> >
> > Instead of using:
> >
> > ratings.map{ case Rating(u,m,r) => {
> >     val pred = model.predict(u, m)
> >     (r - pred)*(r - pred)
> >   }
> > }.mean()
> >
> > you can use something like:
> >
> > val predictions: RDD[Rating] = model.predict(data.map(x => (x.user,
> x.product)))
> > val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x =>
> >   def mapPredictedRating(r: Double) = if (implicitPrefs)
> math.max(math.min(r, 1.0), 0.0) else r
> >   ((x.user, x.product), mapPredictedRating(x.rating))
> > }.join(data.map(x => ((x.user, x.product), x.rating))).values
> >
> > math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 -
> x._2)).mean())
> >
> > This work around worked for me.
> >
> > Regarding your question 2, it will be best of you do a special filtering
> of the dataset so that you do train for that user and product.
> > If we don't have any data trained on a user, there is no way to predict
> how he would like a product.
> > That filtering takes a lot of work though. I can share some code on that
> too if you like.
> >
> > Best,
> > Burak
> >
> > ----- Original Message -----
> > From: "Jay Hutfles" <jayhutf...@gmail.com>
> > To: user@spark.apache.org
> > Sent: Thursday, August 7, 2014 1:06:33 PM
> > Subject: questions about MLLib recommendation models
> >
> > I have a few questions regarding a collaborative filtering model, and was
> > hoping for some recommendations (no pun intended...)
> >
> > *Setup*
> >
> > I have a csv file with user/movie/ratings named unimaginatively
> > 'movies.csv'.  Here are the contents:
> >
> > 0,0,5
> > 0,1,5
> > 0,2,0
> > 0,3,0
> > 1,0,5
> > 1,3,0
> > 2,1,4
> > 2,2,0
> > 3,0,0
> > 3,1,0
> > 3,2,5
> > 3,3,4
> > 4,0,0
> > 4,1,0
> > 4,2,5
> >
> > I then load it into an RDD with a nice command like
> >
> > val ratings = sc.textFile("movies.csv").map(_.split(',') match { case
> > Array(u,m,r) => (Rating(u.toInt, m.toInt, r.toDouble))})
> >
> > So far so good.  I'm even okay building a model for predicting the absent
> > values in the matrix with
> >
> > val rank = 10
> > val iters = 20
> > val model = ALS.train(ratings, rank, iters)
> >
> > I can then use the model to predict any user/movie rating without
> trouble,
> > like
> >
> > model.predict(2, 0)
> >
> > *Question 1: *
> >
> > If I were to calculate, say, the mean squared error of the training set
> (or
> > to my next question, a test set), this doesn't work:
> >
> > ratings.map{ case Rating(u,m,r) => {
> >     val pred = model.predict(u, m)
> >     (r - pred)*(r - pred)
> >   }
> > }.mean()
> >
> > Actually, any action on RDDs created by mapping over the RDD[Rating]
> with a
> > model prediction  fails, like
> >
> > ratings.map{ case Rating(u, m, _) => model.predict(u, m) }.collect
> >
> > I get errors due to a "scala.MatchError: null".  Here's the exact
> verbiage:
> >
> >
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> > 26150.0:1 failed 1 times, most recent failure: Exception failure in TID
> > 7091 on host localhost: scala.MatchError: null
> >
> > org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)
> >
> >
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43)
> >         $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
> >         $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
> >         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> >
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> >         scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> >         scala.collection.AbstractIterator.to(Iterator.scala:1157)
> >
> >
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> >         scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> >
> > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> >         scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> >         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> >         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> >
> >
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
> >
> >
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
> >
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >         java.lang.Thread.run(Thread.java:744)
> >
> > I think I'm missing something, since I can build up a scala collection of
> > the exact (user, movie) tuples I'm testing, map over that with the model
> > prediction, and it works fine.  But if I map over the RDD[Rating], it
> > doesn't.  Am I doing something obviously wrong?
> >
> > *Question 2:*
> >
> > I have a much larger data set, and instead of running the ALS algorithm
> on
> > the whole set, it seems prudent to use the kFolds method in
> > org.apache.spark.mllib.util.MLUtils to generate training/testing splits.
> >
> > It's rather sparse data, and there are cases where the test set has both
> > users and movies that are not present in any Ratings in the training set.
> >  When encountering these, the model shouts at me:
> >
> > java.util.NoSuchElementException: next on empty iterator
> >
> > Is it the case that the Alternating Least Squares method doesn't create
> > models which predict values for untrained users/products?  My high-level
> > understanding of the ALS implementation makes it seem understandable that
> > the calculations depend on at least one rating for each user, and at
> least
> > one for each movie.  Is that true?
> >
> > If so, should I simply filter out entries from the test set which have
> > users or movies absent from the training set?  Or is kMeans not an
> > appropriate way to generate test data for collaborative filtering?
> >
> > Actually, I should have probably just asked, "What is the best way to do
> > testing for recommendation models?"  Leave it nice and general...
> >
> > Thanks in advance.  Sorry for the long ramble.
> >    Jay
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>

Reply via email to