This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 887279c  [SPARK-24102][ML][MLLIB][PYSPARK][FOLLOWUP] Added weight 
column to pyspark API for regression evaluator and metrics
887279c is described below

commit 887279cc46a99f7e0c268199cb17faa71d011ef5
Author: Ilya Matiach <il...@microsoft.com>
AuthorDate: Tue Mar 26 09:06:04 2019 -0500

    [SPARK-24102][ML][MLLIB][PYSPARK][FOLLOWUP] Added weight column to pyspark 
API for regression evaluator and metrics
    
    ## What changes were proposed in this pull request?
    Followup to PR https://github.com/apache/spark/pull/17085
    This PR adds the weight column to the pyspark side, which was already added 
to the scala API.
    The PR also undoes a name change in the scala side corresponding to a 
change in another similar PR as noted here:
    https://github.com/apache/spark/pull/17084#discussion_r259648639
    
    ## How was this patch tested?
    
    This patch adds python tests for the changes to the pyspark API.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24197 from imatiach-msft/ilmat/regressor-eval-python.
    
    Authored-by: Ilya Matiach <il...@microsoft.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../spark/mllib/evaluation/RegressionMetrics.scala  | 19 +++++++++++++------
 python/pyspark/ml/evaluation.py                     | 21 ++++++++++++++-------
 python/pyspark/mllib/evaluation.py                  | 16 ++++++++++++----
 3 files changed, 39 insertions(+), 17 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
index 5250479..3cf8584 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
@@ -22,19 +22,19 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, 
MultivariateStatisticalSummary}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
 
 /**
  * Evaluator for regression.
  *
- * @param predAndObsWithOptWeight an RDD of either (prediction, observation, 
weight)
+ * @param predictionAndObservations an RDD of either (prediction, observation, 
weight)
  *                                                    or (prediction, 
observation) pairs
  * @param throughOrigin True if the regression is through the origin. For 
example, in linear
  *                      regression, it will be true without fitting intercept.
  */
 @Since("1.2.0")
 class RegressionMetrics @Since("2.0.0") (
-    predAndObsWithOptWeight: RDD[_ <: Product], throughOrigin: Boolean)
+    predictionAndObservations: RDD[_ <: Product], throughOrigin: Boolean)
     extends Logging {
 
   @Since("1.2.0")
@@ -47,13 +47,20 @@ class RegressionMetrics @Since("2.0.0") (
    *                                  prediction and observation
    */
   private[mllib] def this(predictionAndObservations: DataFrame) =
-    this(predictionAndObservations.rdd.map(r => (r.getDouble(0), 
r.getDouble(1))))
+    this(predictionAndObservations.rdd.map {
+      case Row(prediction: Double, label: Double, weight: Double) =>
+        (prediction, label, weight)
+      case Row(prediction: Double, label: Double) =>
+        (prediction, label, 1.0)
+      case other =>
+        throw new IllegalArgumentException(s"Expected Row of tuples, got 
$other")
+    })
 
   /**
    * Use MultivariateOnlineSummarizer to calculate summary statistics of 
observations and errors.
    */
   private lazy val summary: MultivariateStatisticalSummary = {
-    val summary: MultivariateStatisticalSummary = predAndObsWithOptWeight.map {
+    val summary: MultivariateStatisticalSummary = 
predictionAndObservations.map {
       case (prediction: Double, observation: Double, weight: Double) =>
         (Vectors.dense(observation, observation - prediction), weight)
       case (prediction: Double, observation: Double) =>
@@ -70,7 +77,7 @@ class RegressionMetrics @Since("2.0.0") (
   private lazy val SStot = summary.variance(0) * (summary.weightSum - 1)
   private lazy val SSreg = {
     val yMean = summary.mean(0)
-    predAndObsWithOptWeight.map {
+    predictionAndObservations.map {
       case (prediction: Double, _: Double, weight: Double) =>
         math.pow(prediction - yMean, 2) * weight
       case (prediction: Double, _: Double) => math.pow(prediction - yMean, 2)
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 0f70860..8aca74d 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -190,13 +190,13 @@ class BinaryClassificationEvaluator(JavaEvaluator, 
HasLabelCol, HasRawPrediction
 
 
 @inherit_doc
-class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol,
+class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, 
HasWeightCol,
                           JavaMLReadable, JavaMLWritable):
     """
     .. note:: Experimental
 
-    Evaluator for Regression, which expects two input
-    columns: prediction and label.
+    Evaluator for Regression, which expects input columns prediction, label
+    and an optional weight column.
 
     >>> scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5),
     ...   (-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)]
@@ -214,6 +214,13 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, 
HasPredictionCol,
     >>> evaluator2 = RegressionEvaluator.load(re_path)
     >>> str(evaluator2.getPredictionCol())
     'raw'
+    >>> scoreAndLabelsAndWeight = [(-28.98343821, -27.0, 1.0), (20.21491975, 
21.5, 0.8),
+    ...   (-25.98418959, -22.0, 1.0), (30.69731842, 33.0, 0.6), (74.69283752, 
71.0, 0.2)]
+    >>> dataset = spark.createDataFrame(scoreAndLabelsAndWeight, ["raw", 
"label", "weight"])
+    ...
+    >>> evaluator = RegressionEvaluator(predictionCol="raw", 
weightCol="weight")
+    >>> evaluator.evaluate(dataset)
+    2.740...
 
     .. versionadded:: 1.4.0
     """
@@ -227,10 +234,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, 
HasPredictionCol,
 
     @keyword_only
     def __init__(self, predictionCol="prediction", labelCol="label",
-                 metricName="rmse"):
+                 metricName="rmse", weightCol=None):
         """
         __init__(self, predictionCol="prediction", labelCol="label", \
-                 metricName="rmse")
+                 metricName="rmse", weightCol=None)
         """
         super(RegressionEvaluator, self).__init__()
         self._java_obj = self._new_java_obj(
@@ -256,10 +263,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, 
HasPredictionCol,
     @keyword_only
     @since("1.4.0")
     def setParams(self, predictionCol="prediction", labelCol="label",
-                  metricName="rmse"):
+                  metricName="rmse", weightCol=None):
         """
         setParams(self, predictionCol="prediction", labelCol="label", \
-                  metricName="rmse")
+                  metricName="rmse", weightCol=None)
         Sets params for regression evaluator.
         """
         kwargs = self._input_kwargs
diff --git a/python/pyspark/mllib/evaluation.py 
b/python/pyspark/mllib/evaluation.py
index 171c62c..30032b3 100644
--- a/python/pyspark/mllib/evaluation.py
+++ b/python/pyspark/mllib/evaluation.py
@@ -95,8 +95,7 @@ class RegressionMetrics(JavaModelWrapper):
     """
     Evaluator for regression.
 
-    :param predictionAndObservations: an RDD of (prediction,
-                                      observation) pairs.
+    :param predictionAndObservations: an RDD of prediction, observation and 
optional weight.
 
     >>> predictionAndObservations = sc.parallelize([
     ...     (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])
@@ -111,6 +110,11 @@ class RegressionMetrics(JavaModelWrapper):
     0.61...
     >>> metrics.r2
     0.94...
+    >>> predictionAndObservationsWithOptWeight = sc.parallelize([
+    ...     (2.5, 3.0, 0.5), (0.0, -0.5, 1.0), (2.0, 2.0, 0.3), (8.0, 7.0, 
0.9)])
+    >>> metrics = RegressionMetrics(predictionAndObservationsWithOptWeight)
+    >>> metrics.rootMeanSquaredError
+    0.68...
 
     .. versionadded:: 1.4.0
     """
@@ -118,9 +122,13 @@ class RegressionMetrics(JavaModelWrapper):
     def __init__(self, predictionAndObservations):
         sc = predictionAndObservations.ctx
         sql_ctx = SQLContext.getOrCreate(sc)
-        df = sql_ctx.createDataFrame(predictionAndObservations, 
schema=StructType([
+        numCol = len(predictionAndObservations.first())
+        schema = StructType([
             StructField("prediction", DoubleType(), nullable=False),
-            StructField("observation", DoubleType(), nullable=False)]))
+            StructField("observation", DoubleType(), nullable=False)])
+        if numCol == 3:
+            schema.add("weight", DoubleType(), False)
+        df = sql_ctx.createDataFrame(predictionAndObservations, schema=schema)
         java_class = 
sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
         java_model = java_class(df._jdf)
         super(RegressionMetrics, self).__init__(java_model)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to