Repository: spark
Updated Branches:
  refs/heads/master 89b943438 -> 7583681e6


[SPARK-10188] [PYSPARK] Pyspark CrossValidator with RMSE selects incorrect model

* Added isLargerBetter() method to Pyspark Evaluator to match the Scala version.
* JavaEvaluator delegates isLargerBetter() to underlying Scala object.
* Added check for isLargerBetter() in CrossValidator to determine whether to 
use argmin or argmax.
* Added test cases for where smaller is better (RMSE) and larger is better 
(R-Squared).

(This contribution is my original work and that I license the work to the 
project under Sparks' open source license)

Author: noelsmith <m...@noelsmith.com>

Closes #8399 from noel-smith/pyspark-rmse-xval-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7583681e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7583681e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7583681e

Branch: refs/heads/master
Commit: 7583681e6b0824d7eed471dc4d8fa0b2addf9ffc
Parents: 89b9434
Author: noelsmith <m...@noelsmith.com>
Authored: Thu Aug 27 23:59:30 2015 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Thu Aug 27 23:59:30 2015 -0700

----------------------------------------------------------------------
 python/pyspark/ml/evaluation.py | 12 +++++
 python/pyspark/ml/tests.py      | 87 ++++++++++++++++++++++++++++++++++++
 python/pyspark/ml/tuning.py     |  6 ++-
 3 files changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7583681e/python/pyspark/ml/evaluation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 6b0a9ff..cb3b079 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -66,6 +66,14 @@ class Evaluator(Params):
         else:
             raise ValueError("Params must be a param map but got %s." % 
type(params))
 
+    def isLargerBetter(self):
+        """
+        Indicates whether the metric returned by :py:meth:`evaluate` should be 
maximized
+        (True, default) or minimized (False).
+        A given evaluator may support multiple metrics which may be maximized 
or minimized.
+        """
+        return True
+
 
 @inherit_doc
 class JavaEvaluator(Evaluator, JavaWrapper):
@@ -85,6 +93,10 @@ class JavaEvaluator(Evaluator, JavaWrapper):
         self._transfer_params_to_java()
         return self._java_obj.evaluate(dataset._jdf)
 
+    def isLargerBetter(self):
+        self._transfer_params_to_java()
+        return self._java_obj.isLargerBetter()
+
 
 @inherit_doc
 class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, 
HasRawPredictionCol):

http://git-wip-us.apache.org/repos/asf/spark/blob/7583681e/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index c151d21..60e4237 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -32,11 +32,14 @@ else:
 
 from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
 from pyspark.sql import DataFrame, SQLContext
+from pyspark.sql.functions import rand
+from pyspark.ml.evaluation import RegressionEvaluator
 from pyspark.ml.param import Param, Params
 from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
 from pyspark.ml.util import keyword_only
 from pyspark.ml import Estimator, Model, Pipeline, Transformer
 from pyspark.ml.feature import *
+from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, 
CrossValidatorModel
 from pyspark.mllib.linalg import DenseVector
 
 
@@ -264,5 +267,89 @@ class FeatureTests(PySparkTestCase):
         self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])
 
 
+class HasInducedError(Params):
+
+    def __init__(self):
+        super(HasInducedError, self).__init__()
+        self.inducedError = Param(self, "inducedError",
+                                  "Uniformly-distributed error added to 
feature")
+
+    def getInducedError(self):
+        return self.getOrDefault(self.inducedError)
+
+
+class InducedErrorModel(Model, HasInducedError):
+
+    def __init__(self):
+        super(InducedErrorModel, self).__init__()
+
+    def _transform(self, dataset):
+        return dataset.withColumn("prediction",
+                                  dataset.feature + (rand(0) * 
self.getInducedError()))
+
+
+class InducedErrorEstimator(Estimator, HasInducedError):
+
+    def __init__(self, inducedError=1.0):
+        super(InducedErrorEstimator, self).__init__()
+        self._set(inducedError=inducedError)
+
+    def _fit(self, dataset):
+        model = InducedErrorModel()
+        self._copyValues(model)
+        return model
+
+
+class CrossValidatorTests(PySparkTestCase):
+
+    def test_fit_minimize_metric(self):
+        sqlContext = SQLContext(self.sc)
+        dataset = sqlContext.createDataFrame([
+            (10, 10.0),
+            (50, 50.0),
+            (100, 100.0),
+            (500, 500.0)] * 10,
+            ["feature", "label"])
+
+        iee = InducedErrorEstimator()
+        evaluator = RegressionEvaluator(metricName="rmse")
+
+        grid = (ParamGridBuilder()
+                .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+                .build())
+        cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, 
evaluator=evaluator)
+        cvModel = cv.fit(dataset)
+        bestModel = cvModel.bestModel
+        bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+        self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+                         "Best model should have zero induced error")
+        self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
+
+    def test_fit_maximize_metric(self):
+        sqlContext = SQLContext(self.sc)
+        dataset = sqlContext.createDataFrame([
+            (10, 10.0),
+            (50, 50.0),
+            (100, 100.0),
+            (500, 500.0)] * 10,
+            ["feature", "label"])
+
+        iee = InducedErrorEstimator()
+        evaluator = RegressionEvaluator(metricName="r2")
+
+        grid = (ParamGridBuilder()
+                .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+                .build())
+        cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, 
evaluator=evaluator)
+        cvModel = cv.fit(dataset)
+        bestModel = cvModel.bestModel
+        bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+        self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+                         "Best model should have zero induced error")
+        self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+
+
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/spark/blob/7583681e/python/pyspark/ml/tuning.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index dcfee6a..cae7788 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -223,7 +223,11 @@ class CrossValidator(Estimator):
                 # TODO: duplicate evaluator to take extra params from input
                 metric = eva.evaluate(model.transform(validation, epm[j]))
                 metrics[j] += metric
-        bestIndex = np.argmax(metrics)
+
+        if eva.isLargerBetter():
+            bestIndex = np.argmax(metrics)
+        else:
+            bestIndex = np.argmin(metrics)
         bestModel = est.fit(dataset, epm[bestIndex])
         return CrossValidatorModel(bestModel)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to