Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89503899 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( + k: Int, + users: DataSet[Int], + predictParameters: ParameterMap = ParameterMap.Empty)(implicit + rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = + rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( + testing: DataSet[(Int,Int,Double)], + evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit + rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { + // todo: do not burn 100 topK into code + predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( + instance: Instance, + k: Int, + users: DataSet[Int], + predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** + * Retrieving the training items. + * Although this can be calculated from the training data, it requires a costly + * [[DataSet.distinct]] operation, while in matrix factor models the set items could be + * given more efficiently from the item factors. + */ + def getTrainingItems: DataSet[Int] = { + getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { + users.cross(items) --- End diff -- I completely agree. There are use-cases where we would not like to give rankings from all the items. E.g. when recommending TV programs, we would only like to recommend currently running TV programs, but train on all of them. We'll include an `item` DataSet parameter to ranking predictions. (Btw. I believe the "Flink-way" is to let the user configure as much as possible, but that's just my opinion :) )
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---