This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 81208992cb0 [SPARK-39446][MLLIB] Add relevance score for nDCG evaluation 81208992cb0 is described below commit 81208992cb007e4addc6148600433da1f1a5c9ab Author: uchiiii <uchiku...@gmail.com> AuthorDate: Thu Jun 16 08:24:57 2022 -0500 [SPARK-39446][MLLIB] Add relevance score for nDCG evaluation ### What changes were proposed in this pull request? - To add relevance score to evaluate nDCG in the function `ndcgAt`. - To extend the interface of constructor of `RankingMetrics` class. ### Why are the changes needed? - The precise definition of nDCG is [here on Wikipedia](https://en.wikipedia.org/wiki/Discounted_cumulative_gain), where relevance score is used. Currently, the implementation of nDCG on spark (MLlib) treats this as binary (0 or 1). This PR is to extend the `ndcgAt` function to be able to treat relevance score. ### Does this PR introduce _any_ user-facing change? - I extended the interface of `RankingMetrics` class for `ndcgAt` function, so now it accepts `RDD[(Array[T], Array[T], Array[Double])]` or `RDD[(Array[T], Array[T])])` as the constructor arguments while the `RDD[(Array[T], Array[T])])` was only accepted. ### How was this patch tested? - One test was added in [mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala](https://github.com/apache/spark/pull/36843/files#diff-b85de09fb1428c03471c7c46305ebb9171964e9743d3ac5f4cd9bcb14bdcf6fd) Closes #36843 from uchiiii/add_relevance_to_ndcg. Authored-by: uchiiii <uchiku...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/mllib/evaluation/RankingMetrics.scala | 78 +++++++++++++------ .../mllib/evaluation/RankingMetricsSuite.scala | 87 ++++++++++++++++------ 2 files changed, 120 insertions(+), 45 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala index 9e35ee2d60f..7fccff9a24e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala @@ -32,11 +32,23 @@ import org.apache.spark.rdd.RDD * * Java users should use `RankingMetrics$.of` to create a [[RankingMetrics]] instance. * - * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pair + * or (predicted ranking, ground truth set, + * . relevance value of ground truth set). + * Since 3.4.0, it supports ndcg evaluation with relevance value. */ @Since("1.2.0") -class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) - extends Logging with Serializable { +class RankingMetrics[T: ClassTag] @Since("3.4.0") ( + predictionAndLabels: RDD[(Array[T], Array[T], Array[Double])]) + extends Logging + with Serializable { + + @Since("1.2.0") + def this(predictionAndLabelsWithoutRelevance: => RDD[(Array[T], Array[T])]) = { + this(predictionAndLabelsWithoutRelevance.map { + case (pred, lab) => (pred, lab, Array.empty[Double]) + }) + } /** * Compute the average precision of all the queries, truncated at ranking position k. @@ -58,7 +70,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] @Since("1.2.0") def precisionAt(k: Int): Double = { require(k > 0, "ranking position k should be positive") - predictionAndLabels.map { case (pred, lab) => + predictionAndLabels.map { case (pred, lab, _) => countRelevantItemRatio(pred, lab, k, k) }.mean() } @@ -70,7 +82,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] */ @Since("1.2.0") lazy val meanAveragePrecision: Double = { - predictionAndLabels.map { case (pred, lab) => + predictionAndLabels.map { case (pred, lab, _) => val labSet = lab.toSet val k = math.max(pred.length, labSet.size) averagePrecision(pred, labSet, k) @@ -87,7 +99,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] @Since("3.0.0") def meanAveragePrecisionAt(k: Int): Double = { require(k > 0, "ranking position k should be positive") - predictionAndLabels.map { case (pred, lab) => + predictionAndLabels.map { case (pred, lab, _) => averagePrecision(pred, lab.toSet, k) }.mean() } @@ -127,7 +139,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] * The discounted cumulative gain at position k is computed as: * sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1), * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current - * implementation, the relevance value is binary. + * implementation, the relevance value is binary if the relevance value is empty. * If a query has an empty ground truth set, zero will be used as ndcg together with * a log warning. @@ -142,8 +154,15 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] @Since("1.2.0") def ndcgAt(k: Int): Double = { require(k > 0, "ranking position k should be positive") - predictionAndLabels.map { case (pred, lab) => + predictionAndLabels.map { case (pred, lab, rel) => + val useBinary = rel.isEmpty val labSet = lab.toSet + val relMap = lab.zip(rel).toMap + if (useBinary && lab.size != rel.size) { + logWarning( + "# of ground truth set and # of relevance value set should be equal, " + + "check input data") + } if (labSet.nonEmpty) { val labSetSize = labSet.size @@ -152,18 +171,32 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] var dcg = 0.0 var i = 0 while (i < n) { - // Base of the log doesn't matter for calculating NDCG, - // if the relevance value is binary. - val gain = 1.0 / math.log(i + 2) - if (i < pred.length && labSet.contains(pred(i))) { - dcg += gain - } - if (i < labSetSize) { - maxDcg += gain + if (useBinary) { + // Base of the log doesn't matter for calculating NDCG, + // if the relevance value is binary. + val gain = 1.0 / math.log(i + 2) + if (i < pred.length && labSet.contains(pred(i))) { + dcg += gain + } + if (i < labSetSize) { + maxDcg += gain + } + } else { + if (i < pred.length) { + dcg += (math.pow(2.0, relMap.getOrElse(pred(i), 0.0)) - 1) / math.log(i + 2) + } + if (i < labSetSize) { + maxDcg += (math.pow(2.0, relMap.getOrElse(lab(i), 0.0)) - 1) / math.log(i + 2) + } } i += 1 } - dcg / maxDcg + if (maxDcg == 0.0) { + logWarning("Maximum of relevance of ground truth set is zero, check input data") + 0.0 + } else { + dcg / maxDcg + } } else { logWarning("Empty ground truth set, check input data") 0.0 @@ -191,7 +224,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] @Since("3.0.0") def recallAt(k: Int): Double = { require(k > 0, "ranking position k should be positive") - predictionAndLabels.map { case (pred, lab) => + predictionAndLabels.map { case (pred, lab, _) => countRelevantItemRatio(pred, lab, k, lab.toSet.size) }.mean() } @@ -207,10 +240,11 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])] * @param denominator the denominator of ratio * @return relevant item ratio at the first k ranking positions */ - private def countRelevantItemRatio(pred: Array[T], - lab: Array[T], - k: Int, - denominator: Int): Double = { + private def countRelevantItemRatio( + pred: Array[T], + lab: Array[T], + k: Int, + denominator: Int): Double = { val labSet = lab.toSet if (labSet.nonEmpty) { val n = math.min(pred.length, k) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala index 489eb15f4db..a10cb5c9a4e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala @@ -28,20 +28,20 @@ class RankingMetricsSuite extends SparkFunSuite with MLlibTestSparkContext { Seq( (Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)), (Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)), - (Array(1, 2, 3, 4, 5), Array.empty[Int]) - ), 2) - val eps = 1.0E-5 + (Array(1, 2, 3, 4, 5), Array.empty[Int])), + 2) + val eps = 1.0e-5 val metrics = new RankingMetrics(predictionAndLabels) val map = metrics.meanAveragePrecision - assert(metrics.precisionAt(1) ~== 1.0/3 absTol eps) - assert(metrics.precisionAt(2) ~== 1.0/3 absTol eps) - assert(metrics.precisionAt(3) ~== 1.0/3 absTol eps) - assert(metrics.precisionAt(4) ~== 0.75/3 absTol eps) - assert(metrics.precisionAt(5) ~== 0.8/3 absTol eps) - assert(metrics.precisionAt(10) ~== 0.8/3 absTol eps) - assert(metrics.precisionAt(15) ~== 8.0/45 absTol eps) + assert(metrics.precisionAt(1) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(2) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(3) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(4) ~== 0.75 / 3 absTol eps) + assert(metrics.precisionAt(5) ~== 0.8 / 3 absTol eps) + assert(metrics.precisionAt(10) ~== 0.8 / 3 absTol eps) + assert(metrics.precisionAt(15) ~== 8.0 / 45 absTol eps) assert(map ~== 0.355026 absTol eps) @@ -49,27 +49,68 @@ class RankingMetricsSuite extends SparkFunSuite with MLlibTestSparkContext { assert(metrics.meanAveragePrecisionAt(2) ~== 0.25 absTol eps) assert(metrics.meanAveragePrecisionAt(3) ~== 0.24074 absTol eps) - assert(metrics.ndcgAt(3) ~== 1.0/3 absTol eps) + assert(metrics.ndcgAt(3) ~== 1.0 / 3 absTol eps) assert(metrics.ndcgAt(5) ~== 0.328788 absTol eps) assert(metrics.ndcgAt(10) ~== 0.487913 absTol eps) assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps) - assert(metrics.recallAt(1) ~== 1.0/15 absTol eps) - assert(metrics.recallAt(2) ~== 8.0/45 absTol eps) - assert(metrics.recallAt(3) ~== 11.0/45 absTol eps) - assert(metrics.recallAt(4) ~== 11.0/45 absTol eps) - assert(metrics.recallAt(5) ~== 16.0/45 absTol eps) - assert(metrics.recallAt(10) ~== 2.0/3 absTol eps) - assert(metrics.recallAt(15) ~== 2.0/3 absTol eps) + assert(metrics.recallAt(1) ~== 1.0 / 15 absTol eps) + assert(metrics.recallAt(2) ~== 8.0 / 45 absTol eps) + assert(metrics.recallAt(3) ~== 11.0 / 45 absTol eps) + assert(metrics.recallAt(4) ~== 11.0 / 45 absTol eps) + assert(metrics.recallAt(5) ~== 16.0 / 45 absTol eps) + assert(metrics.recallAt(10) ~== 2.0 / 3 absTol eps) + assert(metrics.recallAt(15) ~== 2.0 / 3 absTol eps) } - test("MAP, NDCG, Recall with few predictions (SPARK-14886)") { + test("Ranking metrics: NDCG with relevance") { val predictionAndLabels = sc.parallelize( Seq( - (Array(1, 6, 2), Array(1, 2, 3, 4, 5)), - (Array.empty[Int], Array(1, 2, 3)) - ), 2) - val eps = 1.0E-5 + ( + Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), + Array(1, 2, 3, 4, 5), + Array(3.0, 2.0, 1.0, 1.0, 1.0)), + (Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3), Array(2.0, 0.0, 0.0)), + (Array(1, 2, 3, 4, 5), Array.empty[Int], Array.empty[Double])), + 2) + val eps = 1.0e-5 + + val metrics = new RankingMetrics(predictionAndLabels) + val map = metrics.meanAveragePrecision + + assert(metrics.precisionAt(1) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(2) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(3) ~== 1.0 / 3 absTol eps) + assert(metrics.precisionAt(4) ~== 0.75 / 3 absTol eps) + assert(metrics.precisionAt(5) ~== 0.8 / 3 absTol eps) + assert(metrics.precisionAt(10) ~== 0.8 / 3 absTol eps) + assert(metrics.precisionAt(15) ~== 8.0 / 45 absTol eps) + + assert(map ~== 0.355026 absTol eps) + + assert(metrics.meanAveragePrecisionAt(1) ~== 0.333334 absTol eps) + assert(metrics.meanAveragePrecisionAt(2) ~== 0.25 absTol eps) + assert(metrics.meanAveragePrecisionAt(3) ~== 0.24074 absTol eps) + + assert(metrics.ndcgAt(3) ~== 0.511959 absTol eps) + assert(metrics.ndcgAt(5) ~== 0.487806 absTol eps) + assert(metrics.ndcgAt(10) ~== 0.518700 absTol eps) + assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps) + + assert(metrics.recallAt(1) ~== 1.0 / 15 absTol eps) + assert(metrics.recallAt(2) ~== 8.0 / 45 absTol eps) + assert(metrics.recallAt(3) ~== 11.0 / 45 absTol eps) + assert(metrics.recallAt(4) ~== 11.0 / 45 absTol eps) + assert(metrics.recallAt(5) ~== 16.0 / 45 absTol eps) + assert(metrics.recallAt(10) ~== 2.0 / 3 absTol eps) + assert(metrics.recallAt(15) ~== 2.0 / 3 absTol eps) + } + + test("MAP, NDCG, Recall with few predictions (SPARK-14886)") { + val predictionAndLabels = sc.parallelize( + Seq((Array(1, 6, 2), Array(1, 2, 3, 4, 5)), (Array.empty[Int], Array(1, 2, 3))), + 2) + val eps = 1.0e-5 val metrics = new RankingMetrics(predictionAndLabels) assert(metrics.precisionAt(1) ~== 0.5 absTol eps) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org