ratings.map{ case Rating(u,m,r) => {
    val pred = model.predict(u, m)
    (r - pred)*(r - pred)
  }
}.mean()

The code doesn't work because the userFeatures and productFeatures
stored in the model are RDDs. You tried to serialize them into the
task closure, and execute `model.predict` on an executor, which won't
work because `model.predict` can only be called on the driver. We
should make this clear in the doc. You should use what Burak
suggested:

val predictions = model.predict(data.map(x => (x.user, x.product)))

Best,
Xiangrui

On Thu, Aug 7, 2014 at 1:20 PM, Burak Yavuz <bya...@stanford.edu> wrote:
> Hi Jay,
>
> I've had the same problem you've been having in Question 1 with a synthetic 
> dataset. I thought I wasn't producing the dataset well enough. This seems to
> be a bug. I will open a JIRA for it.
>
> Instead of using:
>
> ratings.map{ case Rating(u,m,r) => {
>     val pred = model.predict(u, m)
>     (r - pred)*(r - pred)
>   }
> }.mean()
>
> you can use something like:
>
> val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, 
> x.product)))
> val predictionsAndRatings: RDD[(Double, Double)] = predictions.map{ x =>
>   def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 
> 1.0), 0.0) else r
>   ((x.user, x.product), mapPredictedRating(x.rating))
> }.join(data.map(x => ((x.user, x.product), x.rating))).values
>
> math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - 
> x._2)).mean())
>
> This work around worked for me.
>
> Regarding your question 2, it will be best of you do a special filtering of 
> the dataset so that you do train for that user and product.
> If we don't have any data trained on a user, there is no way to predict how 
> he would like a product.
> That filtering takes a lot of work though. I can share some code on that too 
> if you like.
>
> Best,
> Burak
>
> ----- Original Message -----
> From: "Jay Hutfles" <jayhutf...@gmail.com>
> To: user@spark.apache.org
> Sent: Thursday, August 7, 2014 1:06:33 PM
> Subject: questions about MLLib recommendation models
>
> I have a few questions regarding a collaborative filtering model, and was
> hoping for some recommendations (no pun intended...)
>
> *Setup*
>
> I have a csv file with user/movie/ratings named unimaginatively
> 'movies.csv'.  Here are the contents:
>
> 0,0,5
> 0,1,5
> 0,2,0
> 0,3,0
> 1,0,5
> 1,3,0
> 2,1,4
> 2,2,0
> 3,0,0
> 3,1,0
> 3,2,5
> 3,3,4
> 4,0,0
> 4,1,0
> 4,2,5
>
> I then load it into an RDD with a nice command like
>
> val ratings = sc.textFile("movies.csv").map(_.split(',') match { case
> Array(u,m,r) => (Rating(u.toInt, m.toInt, r.toDouble))})
>
> So far so good.  I'm even okay building a model for predicting the absent
> values in the matrix with
>
> val rank = 10
> val iters = 20
> val model = ALS.train(ratings, rank, iters)
>
> I can then use the model to predict any user/movie rating without trouble,
> like
>
> model.predict(2, 0)
>
> *Question 1: *
>
> If I were to calculate, say, the mean squared error of the training set (or
> to my next question, a test set), this doesn't work:
>
> ratings.map{ case Rating(u,m,r) => {
>     val pred = model.predict(u, m)
>     (r - pred)*(r - pred)
>   }
> }.mean()
>
> Actually, any action on RDDs created by mapping over the RDD[Rating] with a
> model prediction  fails, like
>
> ratings.map{ case Rating(u, m, _) => model.predict(u, m) }.collect
>
> I get errors due to a "scala.MatchError: null".  Here's the exact verbiage:
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 26150.0:1 failed 1 times, most recent failure: Exception failure in TID
> 7091 on host localhost: scala.MatchError: null
>
> org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)
>
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:43)
>         $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
>         $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
>         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:744)
>
> I think I'm missing something, since I can build up a scala collection of
> the exact (user, movie) tuples I'm testing, map over that with the model
> prediction, and it works fine.  But if I map over the RDD[Rating], it
> doesn't.  Am I doing something obviously wrong?
>
> *Question 2:*
>
> I have a much larger data set, and instead of running the ALS algorithm on
> the whole set, it seems prudent to use the kFolds method in
> org.apache.spark.mllib.util.MLUtils to generate training/testing splits.
>
> It's rather sparse data, and there are cases where the test set has both
> users and movies that are not present in any Ratings in the training set.
>  When encountering these, the model shouts at me:
>
> java.util.NoSuchElementException: next on empty iterator
>
> Is it the case that the Alternating Least Squares method doesn't create
> models which predict values for untrained users/products?  My high-level
> understanding of the ALS implementation makes it seem understandable that
> the calculations depend on at least one rating for each user, and at least
> one for each movie.  Is that true?
>
> If so, should I simply filter out entries from the test set which have
> users or movies absent from the training set?  Or is kMeans not an
> appropriate way to generate test data for collaborative filtering?
>
> Actually, I should have probably just asked, "What is the best way to do
> testing for recommendation models?"  Leave it nice and general...
>
> Thanks in advance.  Sorry for the long ramble.
>    Jay
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to