Repository: spark Updated Branches: refs/heads/master fe7b219ae -> 98057583d
[SPARK-20679][ML] Support recommending for a subset of users/items in ALSModel This PR adds methods `recommendForUserSubset` and `recommendForItemSubset` to `ALSModel`. These allow recommending for a specified set of user / item ids rather than for every user / item (as in the `recommendForAllX` methods). The subset methods take a `DataFrame` as input, containing ids in the column specified by the param `userCol` or `itemCol`. The model will generate recommendations for each _unique_ id in this input dataframe. ## How was this patch tested? New unit tests in `ALSSuite` and Python doctests in `ALS`. Ran updated examples locally. Author: Nick Pentreath <ni...@za.ibm.com> Closes #18748 from MLnick/als-recommend-df. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98057583 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98057583 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98057583 Branch: refs/heads/master Commit: 98057583dd2787c0e396c2658c7dd76412f86936 Parents: fe7b219 Author: Nick Pentreath <ni...@za.ibm.com> Authored: Mon Oct 9 10:42:33 2017 +0200 Committer: Nick Pentreath <ni...@za.ibm.com> Committed: Mon Oct 9 10:42:33 2017 +0200 ---------------------------------------------------------------------- .../spark/examples/ml/JavaALSExample.java | 9 ++ examples/src/main/python/ml/als_example.py | 9 ++ .../apache/spark/examples/ml/ALSExample.scala | 9 ++ .../apache/spark/ml/recommendation/ALS.scala | 48 +++++++++ .../spark/ml/recommendation/ALSSuite.scala | 100 +++++++++++++++++-- python/pyspark/ml/recommendation.py | 38 +++++++ 6 files changed, 205 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index fe4d6bc..27052be 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -118,9 +118,18 @@ public class JavaALSExample { Dataset<Row> userRecs = model.recommendForAllUsers(10); // Generate top 10 user recommendations for each movie Dataset<Row> movieRecs = model.recommendForAllItems(10); + + // Generate top 10 movie recommendations for a specified set of users + Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3); + Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10); + // Generate top 10 user recommendations for a specified set of movies + Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3); + Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10); // $example off$ userRecs.show(); movieRecs.show(); + userSubsetRecs.show(); + movieSubSetRecs.show(); spark.stop(); } http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/python/ml/als_example.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py index 1672d55..8b7ec9c 100644 --- a/examples/src/main/python/ml/als_example.py +++ b/examples/src/main/python/ml/als_example.py @@ -60,8 +60,17 @@ if __name__ == "__main__": userRecs = model.recommendForAllUsers(10) # Generate top 10 user recommendations for each movie movieRecs = model.recommendForAllItems(10) + + # Generate top 10 movie recommendations for a specified set of users + users = ratings.select(als.getUserCol()).distinct().limit(3) + userSubsetRecs = model.recommendForUserSubset(users, 10) + # Generate top 10 user recommendations for a specified set of movies + movies = ratings.select(als.getItemCol()).distinct().limit(3) + movieSubSetRecs = model.recommendForItemSubset(movies, 10) # $example off$ userRecs.show() movieRecs.show() + userSubsetRecs.show() + movieSubSetRecs.show() spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala index 07b15df..8091838 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala @@ -80,9 +80,18 @@ object ALSExample { val userRecs = model.recommendForAllUsers(10) // Generate top 10 user recommendations for each movie val movieRecs = model.recommendForAllItems(10) + + // Generate top 10 movie recommendations for a specified set of users + val users = ratings.select(als.getUserCol).distinct().limit(3) + val userSubsetRecs = model.recommendForUserSubset(users, 10) + // Generate top 10 user recommendations for a specified set of movies + val movies = ratings.select(als.getItemCol).distinct().limit(3) + val movieSubSetRecs = model.recommendForItemSubset(movies, 10) // $example off$ userRecs.show() movieRecs.show() + userSubsetRecs.show() + movieSubSetRecs.show() spark.stop() } http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 3d5fd17..a884366 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -345,6 +345,21 @@ class ALSModel private[ml] ( } /** + * Returns top `numItems` items recommended for each user id in the input data set. Note that if + * there are duplicate ids in the input dataset, only one set of recommendations per unique id + * will be returned. + * @param dataset a Dataset containing a column of user ids. The column name must match `userCol`. + * @param numItems max number of recommendations for each user. + * @return a DataFrame of (userCol: Int, recommendations), where recommendations are + * stored as an array of (itemCol: Int, rating: Float) Rows. + */ + @Since("2.3.0") + def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { + val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) + } + + /** * Returns top `numUsers` users recommended for each item, for all items. * @param numUsers max number of recommendations for each item * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are @@ -356,6 +371,39 @@ class ALSModel private[ml] ( } /** + * Returns top `numUsers` users recommended for each item id in the input data set. Note that if + * there are duplicate ids in the input dataset, only one set of recommendations per unique id + * will be returned. + * @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`. + * @param numUsers max number of recommendations for each item. + * @return a DataFrame of (itemCol: Int, recommendations), where recommendations are + * stored as an array of (userCol: Int, rating: Float) Rows. + */ + @Since("2.3.0") + def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { + val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) + } + + /** + * Returns a subset of a factor DataFrame limited to only those unique ids contained + * in the input dataset. + * @param dataset input Dataset containing id column to user to filter factors. + * @param factors factor DataFrame to filter. + * @param column column name containing the ids in the input dataset. + * @return DataFrame containing factors only for those ids present in both the input dataset and + * the factor DataFrame. + */ + private def getSourceFactorSubset( + dataset: Dataset[_], + factors: DataFrame, + column: String): DataFrame = { + factors + .join(dataset.select(column), factors("id") === dataset(column), joinType = "left_semi") + .select(factors("id"), factors("features")) + } + + /** * Makes recommendations for all users (or items). * * Note: the previous approach used for computing top-k recommendations http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index ac73191..addcd21 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -723,9 +723,9 @@ class ALSSuite val numUsers = model.userFactors.count val numItems = model.itemFactors.count val expected = Map( - 0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)), - 1 -> Array((3, 39f), (5, 33f), (4, 26f), (6, 16f)), - 2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f)) + 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)), + 1 -> Seq((3, 39f), (5, 33f), (4, 26f), (6, 16f)), + 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f)) ) Seq(2, 4, 6).foreach { k => @@ -743,10 +743,10 @@ class ALSSuite val numUsers = model.userFactors.count val numItems = model.itemFactors.count val expected = Map( - 3 -> Array((0, 54f), (2, 51f), (1, 39f)), - 4 -> Array((0, 44f), (2, 30f), (1, 26f)), - 5 -> Array((2, 45f), (0, 42f), (1, 33f)), - 6 -> Array((0, 28f), (2, 18f), (1, 16f)) + 3 -> Seq((0, 54f), (2, 51f), (1, 39f)), + 4 -> Seq((0, 44f), (2, 30f), (1, 26f)), + 5 -> Seq((2, 45f), (0, 42f), (1, 33f)), + 6 -> Seq((0, 28f), (2, 18f), (1, 16f)) ) Seq(2, 3, 4).foreach { k => @@ -759,9 +759,93 @@ class ALSSuite } } + test("recommendForUserSubset with k <, = and > num_items") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val numItems = model.itemFactors.count + val expected = Map( + 0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)), + 2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f)) + ) + val userSubset = expected.keys.toSeq.toDF("user") + val numUsersSubset = userSubset.count + + Seq(2, 4, 6).foreach { k => + val n = math.min(k, numItems).toInt + val expectedUpToN = expected.mapValues(_.slice(0, n)) + val topItems = model.recommendForUserSubset(userSubset, k) + assert(topItems.count() == numUsersSubset) + assert(topItems.columns.contains("user")) + checkRecommendations(topItems, expectedUpToN, "item") + } + } + + test("recommendForItemSubset with k <, = and > num_users") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val numUsers = model.userFactors.count + val expected = Map( + 3 -> Seq((0, 54f), (2, 51f), (1, 39f)), + 6 -> Seq((0, 28f), (2, 18f), (1, 16f)) + ) + val itemSubset = expected.keys.toSeq.toDF("item") + val numItemsSubset = itemSubset.count + + Seq(2, 3, 4).foreach { k => + val n = math.min(k, numUsers).toInt + val expectedUpToN = expected.mapValues(_.slice(0, n)) + val topUsers = model.recommendForItemSubset(itemSubset, k) + assert(topUsers.count() == numItemsSubset) + assert(topUsers.columns.contains("item")) + checkRecommendations(topUsers, expectedUpToN, "user") + } + } + + test("subset recommendations eliminate duplicate ids, returns same results as unique ids") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val k = 2 + + val users = Seq(0, 1).toDF("user") + val dupUsers = Seq(0, 1, 0, 1).toDF("user") + val singleUserRecs = model.recommendForUserSubset(users, k) + val dupUserRecs = model.recommendForUserSubset(dupUsers, k) + .as[(Int, Seq[(Int, Float)])].collect().toMap + assert(singleUserRecs.count == dupUserRecs.size) + checkRecommendations(singleUserRecs, dupUserRecs, "item") + + val items = Seq(3, 4, 5).toDF("item") + val dupItems = Seq(3, 4, 5, 4, 5).toDF("item") + val singleItemRecs = model.recommendForItemSubset(items, k) + val dupItemRecs = model.recommendForItemSubset(dupItems, k) + .as[(Int, Seq[(Int, Float)])].collect().toMap + assert(singleItemRecs.count == dupItemRecs.size) + checkRecommendations(singleItemRecs, dupItemRecs, "user") + } + + test("subset recommendations on full input dataset equivalent to recommendForAll") { + val spark = this.spark + import spark.implicits._ + val model = getALSModel + val k = 2 + + val userSubset = model.userFactors.withColumnRenamed("id", "user").drop("features") + val userSubsetRecs = model.recommendForUserSubset(userSubset, k) + val allUserRecs = model.recommendForAllUsers(k).as[(Int, Seq[(Int, Float)])].collect().toMap + checkRecommendations(userSubsetRecs, allUserRecs, "item") + + val itemSubset = model.itemFactors.withColumnRenamed("id", "item").drop("features") + val itemSubsetRecs = model.recommendForItemSubset(itemSubset, k) + val allItemRecs = model.recommendForAllItems(k).as[(Int, Seq[(Int, Float)])].collect().toMap + checkRecommendations(itemSubsetRecs, allItemRecs, "user") + } + private def checkRecommendations( topK: DataFrame, - expected: Map[Int, Array[(Int, Float)]], + expected: Map[Int, Seq[(Int, Float)]], dstColName: String): Unit = { val spark = this.spark import spark.implicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/98057583/python/pyspark/ml/recommendation.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index bcfb368..e8bcbe4 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -90,6 +90,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha >>> item_recs.where(item_recs.item == 2)\ .select("recommendations.user", "recommendations.rating").collect() [Row(user=[2, 1, 0], rating=[4.901..., 3.981..., -0.138...])] + >>> user_subset = df.where(df.user == 2) + >>> user_subset_recs = model.recommendForUserSubset(user_subset, 3) + >>> user_subset_recs.select("recommendations.item", "recommendations.rating").first() + Row(item=[2, 1, 0], rating=[4.901..., 1.056..., -1.501...]) + >>> item_subset = df.where(df.item == 0) + >>> item_subset_recs = model.recommendForItemSubset(item_subset, 3) + >>> item_subset_recs.select("recommendations.user", "recommendations.rating").first() + Row(user=[0, 1, 2], rating=[3.910..., 2.625..., -1.501...]) >>> als_path = temp_path + "/als" >>> als.save(als_path) >>> als2 = ALS.load(als_path) @@ -414,6 +422,36 @@ class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable): """ return self._call_java("recommendForAllItems", numUsers) + @since("2.3.0") + def recommendForUserSubset(self, dataset, numItems): + """ + Returns top `numItems` items recommended for each user id in the input data set. Note that + if there are duplicate ids in the input dataset, only one set of recommendations per unique + id will be returned. + + :param dataset: a Dataset containing a column of user ids. The column name must match + `userCol`. + :param numItems: max number of recommendations for each user + :return: a DataFrame of (userCol, recommendations), where recommendations are + stored as an array of (itemCol, rating) Rows. + """ + return self._call_java("recommendForUserSubset", dataset, numItems) + + @since("2.3.0") + def recommendForItemSubset(self, dataset, numUsers): + """ + Returns top `numUsers` users recommended for each item id in the input data set. Note that + if there are duplicate ids in the input dataset, only one set of recommendations per unique + id will be returned. + + :param dataset: a Dataset containing a column of item ids. The column name must match + `itemCol`. + :param numUsers: max number of recommendations for each item + :return: a DataFrame of (itemCol, recommendations), where recommendations are + stored as an array of (userCol, rating) Rows. + """ + return self._call_java("recommendForItemSubset", dataset, numUsers) + if __name__ == "__main__": import doctest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org