This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 887279c [SPARK-24102][ML][MLLIB][PYSPARK][FOLLOWUP] Added weight column to pyspark API for regression evaluator and metrics 887279c is described below commit 887279cc46a99f7e0c268199cb17faa71d011ef5 Author: Ilya Matiach <il...@microsoft.com> AuthorDate: Tue Mar 26 09:06:04 2019 -0500 [SPARK-24102][ML][MLLIB][PYSPARK][FOLLOWUP] Added weight column to pyspark API for regression evaluator and metrics ## What changes were proposed in this pull request? Followup to PR https://github.com/apache/spark/pull/17085 This PR adds the weight column to the pyspark side, which was already added to the scala API. The PR also undoes a name change in the scala side corresponding to a change in another similar PR as noted here: https://github.com/apache/spark/pull/17084#discussion_r259648639 ## How was this patch tested? This patch adds python tests for the changes to the pyspark API. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24197 from imatiach-msft/ilmat/regressor-eval-python. Authored-by: Ilya Matiach <il...@microsoft.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../spark/mllib/evaluation/RegressionMetrics.scala | 19 +++++++++++++------ python/pyspark/ml/evaluation.py | 21 ++++++++++++++------- python/pyspark/mllib/evaluation.py | 16 ++++++++++++---- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala index 5250479..3cf8584 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala @@ -22,19 +22,19 @@ import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} /** * Evaluator for regression. * - * @param predAndObsWithOptWeight an RDD of either (prediction, observation, weight) + * @param predictionAndObservations an RDD of either (prediction, observation, weight) * or (prediction, observation) pairs * @param throughOrigin True if the regression is through the origin. For example, in linear * regression, it will be true without fitting intercept. */ @Since("1.2.0") class RegressionMetrics @Since("2.0.0") ( - predAndObsWithOptWeight: RDD[_ <: Product], throughOrigin: Boolean) + predictionAndObservations: RDD[_ <: Product], throughOrigin: Boolean) extends Logging { @Since("1.2.0") @@ -47,13 +47,20 @@ class RegressionMetrics @Since("2.0.0") ( * prediction and observation */ private[mllib] def this(predictionAndObservations: DataFrame) = - this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) + this(predictionAndObservations.rdd.map { + case Row(prediction: Double, label: Double, weight: Double) => + (prediction, label, weight) + case Row(prediction: Double, label: Double) => + (prediction, label, 1.0) + case other => + throw new IllegalArgumentException(s"Expected Row of tuples, got $other") + }) /** * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors. */ private lazy val summary: MultivariateStatisticalSummary = { - val summary: MultivariateStatisticalSummary = predAndObsWithOptWeight.map { + val summary: MultivariateStatisticalSummary = predictionAndObservations.map { case (prediction: Double, observation: Double, weight: Double) => (Vectors.dense(observation, observation - prediction), weight) case (prediction: Double, observation: Double) => @@ -70,7 +77,7 @@ class RegressionMetrics @Since("2.0.0") ( private lazy val SStot = summary.variance(0) * (summary.weightSum - 1) private lazy val SSreg = { val yMean = summary.mean(0) - predAndObsWithOptWeight.map { + predictionAndObservations.map { case (prediction: Double, _: Double, weight: Double) => math.pow(prediction - yMean, 2) * weight case (prediction: Double, _: Double) => math.pow(prediction - yMean, 2) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 0f70860..8aca74d 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -190,13 +190,13 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction @inherit_doc -class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, +class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, HasWeightCol, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental - Evaluator for Regression, which expects two input - columns: prediction and label. + Evaluator for Regression, which expects input columns prediction, label + and an optional weight column. >>> scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5), ... (-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)] @@ -214,6 +214,13 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, >>> evaluator2 = RegressionEvaluator.load(re_path) >>> str(evaluator2.getPredictionCol()) 'raw' + >>> scoreAndLabelsAndWeight = [(-28.98343821, -27.0, 1.0), (20.21491975, 21.5, 0.8), + ... (-25.98418959, -22.0, 1.0), (30.69731842, 33.0, 0.6), (74.69283752, 71.0, 0.2)] + >>> dataset = spark.createDataFrame(scoreAndLabelsAndWeight, ["raw", "label", "weight"]) + ... + >>> evaluator = RegressionEvaluator(predictionCol="raw", weightCol="weight") + >>> evaluator.evaluate(dataset) + 2.740... .. versionadded:: 1.4.0 """ @@ -227,10 +234,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, @keyword_only def __init__(self, predictionCol="prediction", labelCol="label", - metricName="rmse"): + metricName="rmse", weightCol=None): """ __init__(self, predictionCol="prediction", labelCol="label", \ - metricName="rmse") + metricName="rmse", weightCol=None) """ super(RegressionEvaluator, self).__init__() self._java_obj = self._new_java_obj( @@ -256,10 +263,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, @keyword_only @since("1.4.0") def setParams(self, predictionCol="prediction", labelCol="label", - metricName="rmse"): + metricName="rmse", weightCol=None): """ setParams(self, predictionCol="prediction", labelCol="label", \ - metricName="rmse") + metricName="rmse", weightCol=None) Sets params for regression evaluator. """ kwargs = self._input_kwargs diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 171c62c..30032b3 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -95,8 +95,7 @@ class RegressionMetrics(JavaModelWrapper): """ Evaluator for regression. - :param predictionAndObservations: an RDD of (prediction, - observation) pairs. + :param predictionAndObservations: an RDD of prediction, observation and optional weight. >>> predictionAndObservations = sc.parallelize([ ... (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)]) @@ -111,6 +110,11 @@ class RegressionMetrics(JavaModelWrapper): 0.61... >>> metrics.r2 0.94... + >>> predictionAndObservationsWithOptWeight = sc.parallelize([ + ... (2.5, 3.0, 0.5), (0.0, -0.5, 1.0), (2.0, 2.0, 0.3), (8.0, 7.0, 0.9)]) + >>> metrics = RegressionMetrics(predictionAndObservationsWithOptWeight) + >>> metrics.rootMeanSquaredError + 0.68... .. versionadded:: 1.4.0 """ @@ -118,9 +122,13 @@ class RegressionMetrics(JavaModelWrapper): def __init__(self, predictionAndObservations): sc = predictionAndObservations.ctx sql_ctx = SQLContext.getOrCreate(sc) - df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([ + numCol = len(predictionAndObservations.first()) + schema = StructType([ StructField("prediction", DoubleType(), nullable=False), - StructField("observation", DoubleType(), nullable=False)])) + StructField("observation", DoubleType(), nullable=False)]) + if numCol == 3: + schema.add("weight", DoubleType(), False) + df = sql_ctx.createDataFrame(predictionAndObservations, schema=schema) java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics java_model = java_class(df._jdf) super(RegressionMetrics, self).__init__(java_model) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org