[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-08-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16618


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-04-27 Thread daniloascione
Github user daniloascione commented on a diff in the pull request:

https://github.com/apache/spark/pull/16618#discussion_r113721386
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.functions.{mean, sum}
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.types.DoubleType
+
+@Since("2.2.0")
+class RankingMetrics(
+  predictionAndObservations: DataFrame, predictionCol: String, labelCol: 
String)
+  extends Logging with Serializable {
+
+  /**
+   * Compute the Mean Percentile Rank (MPR) of all the queries.
+   *
+   * See the following paper for detail ("Expected percentile rank" in the 
paper):
+   * Hu, Y., Y. Koren, and C. Volinsky. “Collaborative Filtering for 
Implicit Feedback Datasets.”
+   * In 2008 Eighth IEEE International Conference on Data Mining, 
263–72, 2008.
+   * doi:10.1109/ICDM.2008.22.
+   *
+   * @return the mean percentile rank
+   */
+  lazy val meanPercentileRank: Double = {
+
+def rank = udf((predicted: Seq[Any], actual: Any) => {
+  val l_i = predicted.indexOf(actual)
+
+  if (l_i == -1) {
+1
+  } else {
+l_i.toDouble / predicted.size
+  }
+}, DoubleType)
+
+val R_prime = predictionAndObservations.count()
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+val rankSum: Double = predictionAndObservations
+  .withColumn("rank", rank(predictionColumn, labelColumn))
+  .agg(sum("rank")).first().getDouble(0)
+
+rankSum / R_prime
+  }
+
+  /**
+   * Compute the average precision of all the queries, truncated at 
ranking position k.
+   *
+   * If for a query, the ranking algorithm returns n (n is less than k) 
results, the precision
+   * value will be computed as #(relevant items retrieved) / k. This 
formula also applies when
+   * the size of the ground truth set is less than k.
+   *
+   * If a query has an empty ground truth set, zero will be used as 
precision together with
+   * a log warning.
+   *
+   * See the following paper for detail:
+   *
+   * IR evaluation methods for retrieving highly relevant documents. K. 
Jarvelin and J. Kekalainen
+   *
+   * @param k the position to compute the truncated precision, must be 
positive
+   * @return the average precision at the first k ranking positions
+   */
+  @Since("2.2.0")
+  def precisionAt(k: Int): Double = {
+require(k > 0, "ranking position k should be positive")
+
+def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
+  val actualSet = actual.toSet
+  if (actualSet.nonEmpty) {
+val n = math.min(predicted.length, k)
+var i = 0
+var cnt = 0
+while (i < n) {
+  if (actualSet.contains(predicted(i))) {
+cnt += 1
+  }
+  i += 1
+}
+cnt.toDouble / k
+  } else {
+logWarning("Empty ground truth set, check input data")
+0.0
+  }
+}, DoubleType)
+
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+predictionAndObservations
+  .withColumn("predictionAtK", precisionAtK(predictionColumn, 
labelColumn))
+  .agg(mean("predictionAtK")).first().getDouble(0)
+  }
+
+  /**
+   * Returns the mean average precision (MAP) of all the queries.
+   * If a query has an empty ground truth se

[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-04-27 Thread daniloascione
Github user daniloascione commented on a diff in the pull request:

https://github.com/apache/spark/pull/16618#discussion_r113702013
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.param.{IntParam, Param, ParamMap, 
ParamValidators}
+import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions.{coalesce, col, collect_list, 
row_number, udf}
+import org.apache.spark.sql.types.LongType
+
+/**
+ * Evaluator for ranking.
+ */
+@Since("2.2.0")
+@Experimental
+final class RankingEvaluator @Since("2.2.0")(@Since("2.2.0") override val 
uid: String)
+  extends Evaluator with HasPredictionCol with HasLabelCol with 
DefaultParamsWritable {
+
+  @Since("2.2.0")
+  def this() = this(Identifiable.randomUID("rankingEval"))
+
+  @Since("2.2.0")
+  val k = new IntParam(this, "k", "Top-K cutoff", (x: Int) => x > 0)
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getK: Int = $(k)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setK(value: Int): this.type = set(k, value)
+
+  setDefault(k -> 1)
+
+  @Since("2.2.0")
+  val metricName: Param[String] = {
+val allowedParams = ParamValidators.inArray(Array("mpr"))
+new Param(this, "metricName", "metric name in evaluation (mpr)", 
allowedParams)
+  }
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getMetricName: String = $(metricName)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setMetricName(value: String): this.type = set(metricName, value)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setLabelCol(value: String): this.type = set(labelCol, value)
+
+  /**
+   * Param for query column name.
+   * @group param
+   */
+  val queryCol: Param[String] = new Param[String](this, "queryCol", "query 
column name")
+
+  setDefault(queryCol, "query")
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getQueryCol: String = $(queryCol)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setQueryCol(value: String): this.type = set(queryCol, value)
+
+  setDefault(metricName -> "mpr")
+
+  @Since("2.2.0")
+  override def evaluate(dataset: Dataset[_]): Double = {
+val schema = dataset.schema
+SchemaUtils.checkNumericType(schema, $(predictionCol))
+SchemaUtils.checkNumericType(schema, $(labelCol))
+SchemaUtils.checkNumericType(schema, $(queryCol))
+
+val w = 
Window.partitionBy(col($(queryCol))).orderBy(col($(predictionCol)).desc)
+
+val topAtk: DataFrame = dataset
+  .na.drop("all", Seq($(predictionCol)))
+  .select(col($(predictionCol)), col($(labelCol)).cast(LongType), 
col($(queryCol)))
+  .withColumn("rn", row_number().over(w)).where(col("rn") <= $(k))
+  .drop("rn")
+  .groupBy(col($(queryCol)))
+  .agg(collect_list($(labelCol)).as("topAtk"))
+
+val mapToEmptyArray_ = udf(() => Array.empty[Long])
+
+val predictionAndLabels: DataFrame = dataset
+  .join(topAtk, Seq($(queryCol)), "outer")
+  .withColumn("topAtk", coalesce(col("topAtk"), mapToEmptyArray_()))
+  .select($(labelCol), "topAtk")
--- End diff --

Yes, I agree. This is currently done in the previous step, when the topAtk 
Dataframe is calculated ([line 
101](https://github.com/apache/spa

[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-04-25 Thread ebernhardson
Github user ebernhardson commented on a diff in the pull request:

https://github.com/apache/spark/pull/16618#discussion_r113360277
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.functions.{mean, sum}
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.types.DoubleType
+
+@Since("2.2.0")
+class RankingMetrics(
+  predictionAndObservations: DataFrame, predictionCol: String, labelCol: 
String)
+  extends Logging with Serializable {
+
+  /**
+   * Compute the Mean Percentile Rank (MPR) of all the queries.
+   *
+   * See the following paper for detail ("Expected percentile rank" in the 
paper):
+   * Hu, Y., Y. Koren, and C. Volinsky. “Collaborative Filtering for 
Implicit Feedback Datasets.”
+   * In 2008 Eighth IEEE International Conference on Data Mining, 
263–72, 2008.
+   * doi:10.1109/ICDM.2008.22.
+   *
+   * @return the mean percentile rank
+   */
+  lazy val meanPercentileRank: Double = {
+
+def rank = udf((predicted: Seq[Any], actual: Any) => {
+  val l_i = predicted.indexOf(actual)
+
+  if (l_i == -1) {
+1
+  } else {
+l_i.toDouble / predicted.size
+  }
+}, DoubleType)
+
+val R_prime = predictionAndObservations.count()
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+val rankSum: Double = predictionAndObservations
+  .withColumn("rank", rank(predictionColumn, labelColumn))
+  .agg(sum("rank")).first().getDouble(0)
+
+rankSum / R_prime
+  }
+
+  /**
+   * Compute the average precision of all the queries, truncated at 
ranking position k.
+   *
+   * If for a query, the ranking algorithm returns n (n is less than k) 
results, the precision
+   * value will be computed as #(relevant items retrieved) / k. This 
formula also applies when
+   * the size of the ground truth set is less than k.
+   *
+   * If a query has an empty ground truth set, zero will be used as 
precision together with
+   * a log warning.
+   *
+   * See the following paper for detail:
+   *
+   * IR evaluation methods for retrieving highly relevant documents. K. 
Jarvelin and J. Kekalainen
+   *
+   * @param k the position to compute the truncated precision, must be 
positive
+   * @return the average precision at the first k ranking positions
+   */
+  @Since("2.2.0")
+  def precisionAt(k: Int): Double = {
+require(k > 0, "ranking position k should be positive")
+
+def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
+  val actualSet = actual.toSet
+  if (actualSet.nonEmpty) {
+val n = math.min(predicted.length, k)
+var i = 0
+var cnt = 0
+while (i < n) {
+  if (actualSet.contains(predicted(i))) {
+cnt += 1
+  }
+  i += 1
+}
+cnt.toDouble / k
+  } else {
+logWarning("Empty ground truth set, check input data")
+0.0
+  }
+}, DoubleType)
+
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+predictionAndObservations
+  .withColumn("predictionAtK", precisionAtK(predictionColumn, 
labelColumn))
+  .agg(mean("predictionAtK")).first().getDouble(0)
+  }
+
+  /**
+   * Returns the mean average precision (MAP) of all the queries.
+   * If a query has an empty ground truth set

[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-04-25 Thread ebernhardson
Github user ebernhardson commented on a diff in the pull request:

https://github.com/apache/spark/pull/16618#discussion_r113358325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.param.{IntParam, Param, ParamMap, 
ParamValidators}
+import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions.{coalesce, col, collect_list, 
row_number, udf}
+import org.apache.spark.sql.types.LongType
+
+/**
+ * Evaluator for ranking.
+ */
+@Since("2.2.0")
+@Experimental
+final class RankingEvaluator @Since("2.2.0")(@Since("2.2.0") override val 
uid: String)
+  extends Evaluator with HasPredictionCol with HasLabelCol with 
DefaultParamsWritable {
+
+  @Since("2.2.0")
+  def this() = this(Identifiable.randomUID("rankingEval"))
+
+  @Since("2.2.0")
+  val k = new IntParam(this, "k", "Top-K cutoff", (x: Int) => x > 0)
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getK: Int = $(k)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setK(value: Int): this.type = set(k, value)
+
+  setDefault(k -> 1)
+
+  @Since("2.2.0")
+  val metricName: Param[String] = {
+val allowedParams = ParamValidators.inArray(Array("mpr"))
+new Param(this, "metricName", "metric name in evaluation (mpr)", 
allowedParams)
+  }
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getMetricName: String = $(metricName)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setMetricName(value: String): this.type = set(metricName, value)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setLabelCol(value: String): this.type = set(labelCol, value)
+
+  /**
+   * Param for query column name.
+   * @group param
+   */
+  val queryCol: Param[String] = new Param[String](this, "queryCol", "query 
column name")
+
+  setDefault(queryCol, "query")
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getQueryCol: String = $(queryCol)
+
+  /** @group setParam */
+  @Since("2.2.0")
+  def setQueryCol(value: String): this.type = set(queryCol, value)
+
+  setDefault(metricName -> "mpr")
+
+  @Since("2.2.0")
+  override def evaluate(dataset: Dataset[_]): Double = {
+val schema = dataset.schema
+SchemaUtils.checkNumericType(schema, $(predictionCol))
+SchemaUtils.checkNumericType(schema, $(labelCol))
+SchemaUtils.checkNumericType(schema, $(queryCol))
+
+val w = 
Window.partitionBy(col($(queryCol))).orderBy(col($(predictionCol)).desc)
+
+val topAtk: DataFrame = dataset
+  .na.drop("all", Seq($(predictionCol)))
+  .select(col($(predictionCol)), col($(labelCol)).cast(LongType), 
col($(queryCol)))
+  .withColumn("rn", row_number().over(w)).where(col("rn") <= $(k))
+  .drop("rn")
+  .groupBy(col($(queryCol)))
+  .agg(collect_list($(labelCol)).as("topAtk"))
+
+val mapToEmptyArray_ = udf(() => Array.empty[Long])
+
+val predictionAndLabels: DataFrame = dataset
+  .join(topAtk, Seq($(queryCol)), "outer")
+  .withColumn("topAtk", coalesce(col("topAtk"), mapToEmptyArray_()))
+  .select($(labelCol), "topAtk")
--- End diff --

Don't we also need to run an aggregation on the label column, roughly the 
same as the previous aggregation but using labelCol as the sort inst

[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

2017-04-25 Thread ebernhardson
Github user ebernhardson commented on a diff in the pull request:

https://github.com/apache/spark/pull/16618#discussion_r113355473
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.functions.{mean, sum}
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.types.DoubleType
+
+@Since("2.2.0")
+class RankingMetrics(
+  predictionAndObservations: DataFrame, predictionCol: String, labelCol: 
String)
+  extends Logging with Serializable {
+
+  /**
+   * Compute the Mean Percentile Rank (MPR) of all the queries.
+   *
+   * See the following paper for detail ("Expected percentile rank" in the 
paper):
+   * Hu, Y., Y. Koren, and C. Volinsky. “Collaborative Filtering for 
Implicit Feedback Datasets.”
+   * In 2008 Eighth IEEE International Conference on Data Mining, 
263–72, 2008.
+   * doi:10.1109/ICDM.2008.22.
+   *
+   * @return the mean percentile rank
+   */
+  lazy val meanPercentileRank: Double = {
+
+def rank = udf((predicted: Seq[Any], actual: Any) => {
+  val l_i = predicted.indexOf(actual)
+
+  if (l_i == -1) {
+1
+  } else {
+l_i.toDouble / predicted.size
+  }
+}, DoubleType)
+
+val R_prime = predictionAndObservations.count()
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+val rankSum: Double = predictionAndObservations
+  .withColumn("rank", rank(predictionColumn, labelColumn))
+  .agg(sum("rank")).first().getDouble(0)
+
+rankSum / R_prime
+  }
+
+  /**
+   * Compute the average precision of all the queries, truncated at 
ranking position k.
+   *
+   * If for a query, the ranking algorithm returns n (n is less than k) 
results, the precision
+   * value will be computed as #(relevant items retrieved) / k. This 
formula also applies when
+   * the size of the ground truth set is less than k.
+   *
+   * If a query has an empty ground truth set, zero will be used as 
precision together with
+   * a log warning.
+   *
+   * See the following paper for detail:
+   *
+   * IR evaluation methods for retrieving highly relevant documents. K. 
Jarvelin and J. Kekalainen
+   *
+   * @param k the position to compute the truncated precision, must be 
positive
+   * @return the average precision at the first k ranking positions
+   */
+  @Since("2.2.0")
+  def precisionAt(k: Int): Double = {
+require(k > 0, "ranking position k should be positive")
+
+def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
+  val actualSet = actual.toSet
+  if (actualSet.nonEmpty) {
+val n = math.min(predicted.length, k)
+var i = 0
+var cnt = 0
+while (i < n) {
+  if (actualSet.contains(predicted(i))) {
+cnt += 1
+  }
+  i += 1
+}
+cnt.toDouble / k
+  } else {
+logWarning("Empty ground truth set, check input data")
+0.0
+  }
+}, DoubleType)
+
+val predictionColumn: Column = 
predictionAndObservations.col(predictionCol)
+val labelColumn: Column = predictionAndObservations.col(labelCol)
+
+predictionAndObservations
+  .withColumn("predictionAtK", precisionAtK(predictionColumn, 
labelColumn))
+  .agg(mean("predictionAtK")).first().getDouble(0)
+  }
+
+  /**
+   * Returns the mean average precision (MAP) of all the queries.
+   * If a query has an empty ground truth set