This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 85c9e8c  [SPARK-32092][ML][PYSPARK] Fix parameters not being copied in 
CrossValidatorModel.copy(), read() and write()
85c9e8c is described below

commit 85c9e8c54c30b69c39075e97cd3cac295be09303
Author: Louiszr <zxhs...@gmail.com>
AuthorDate: Sat Aug 22 09:27:31 2020 -0500

    [SPARK-32092][ML][PYSPARK] Fix parameters not being copied in 
CrossValidatorModel.copy(), read() and write()
    
    ### What changes were proposed in this pull request?
    
    Changed the definitions of 
`CrossValidatorModel.copy()/_to_java()/_from_java()` so that exposed parameters 
(i.e. parameters with `get()` methods) are copied in these methods.
    
    ### Why are the changes needed?
    
    Parameters are copied in the respective Scala interface for 
`CrossValidatorModel.copy()`.
    It fits the semantics to persist parameters when calling 
`CrossValidatorModel.save()` and `CrossValidatorModel.load()` so that the user 
gets the same model by saving and loading it after. Not copying across 
`numFolds` also causes bugs like Array index out of bound and losing sub-models 
because this parameters will always default to 3 (as described in the JIRA 
ticket).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tests for `CrossValidatorModel.copy()` and `save()`/`load()` are updated so 
that they check parameters before and after function calls.
    
    Closes #29445 from Louiszr/master.
    
    Authored-by: Louiszr <zxhs...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit d9eb06ea37cab185f1e49c641313be9707270252)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 python/pyspark/ml/tests/test_tuning.py | 131 ++++++++++++++++++++++++++++++---
 python/pyspark/ml/tuning.py            |  67 +++++++++++++----
 2 files changed, 172 insertions(+), 26 deletions(-)

diff --git a/python/pyspark/ml/tests/test_tuning.py 
b/python/pyspark/ml/tests/test_tuning.py
index 6bcc3f9..b250740 100644
--- a/python/pyspark/ml/tests/test_tuning.py
+++ b/python/pyspark/ml/tests/test_tuning.py
@@ -89,15 +89,50 @@ class CrossValidatorTests(SparkSessionTestCase):
         grid = (ParamGridBuilder()
                 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
                 .build())
-        cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, 
evaluator=evaluator)
+        cv = CrossValidator(
+            estimator=iee,
+            estimatorParamMaps=grid,
+            evaluator=evaluator,
+            collectSubModels=True,
+            numFolds=2
+        )
         cvCopied = cv.copy()
-        self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
+        for param in [
+            lambda x: x.getEstimator().uid,
+            # SPARK-32092: CrossValidator.copy() needs to copy all existing 
params
+            lambda x: x.getNumFolds(),
+            lambda x: x.getFoldCol(),
+            lambda x: x.getCollectSubModels(),
+            lambda x: x.getParallelism(),
+            lambda x: x.getSeed()
+        ]:
+            self.assertEqual(param(cv), param(cvCopied))
 
         cvModel = cv.fit(dataset)
         cvModelCopied = cvModel.copy()
         for index in range(len(cvModel.avgMetrics)):
             self.assertTrue(abs(cvModel.avgMetrics[index] - 
cvModelCopied.avgMetrics[index])
                             < 0.0001)
+        # SPARK-32092: CrossValidatorModel.copy() needs to copy all existing 
params
+        for param in [
+            lambda x: x.getNumFolds(),
+            lambda x: x.getFoldCol(),
+            lambda x: x.getSeed()
+        ]:
+            self.assertEqual(param(cvModel), param(cvModelCopied))
+
+        cvModel.avgMetrics[0] = 'foo'
+        self.assertNotEqual(
+            cvModelCopied.avgMetrics[0],
+            'foo',
+            "Changing the original avgMetrics should not affect the copied 
model"
+        )
+        cvModel.subModels[0] = 'foo'
+        self.assertNotEqual(
+            cvModelCopied.subModels[0],
+            'foo',
+            "Changing the original subModels should not affect the copied 
model"
+        )
 
     def test_fit_minimize_metric(self):
         dataset = self.spark.createDataFrame([
@@ -166,16 +201,39 @@ class CrossValidatorTests(SparkSessionTestCase):
         lr = LogisticRegression()
         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
         evaluator = BinaryClassificationEvaluator()
-        cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, 
evaluator=evaluator)
+        cv = CrossValidator(
+            estimator=lr,
+            estimatorParamMaps=grid,
+            evaluator=evaluator,
+            collectSubModels=True,
+            numFolds=4,
+            seed=42
+        )
         cvModel = cv.fit(dataset)
         lrModel = cvModel.bestModel
 
-        cvModelPath = temp_path + "/cvModel"
-        lrModel.save(cvModelPath)
-        loadedLrModel = LogisticRegressionModel.load(cvModelPath)
+        lrModelPath = temp_path + "/lrModel"
+        lrModel.save(lrModelPath)
+        loadedLrModel = LogisticRegressionModel.load(lrModelPath)
         self.assertEqual(loadedLrModel.uid, lrModel.uid)
         self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
 
+        # SPARK-32092: Saving and then loading CrossValidatorModel should not 
change the params
+        cvModelPath = temp_path + "/cvModel"
+        cvModel.save(cvModelPath)
+        loadedCvModel = CrossValidatorModel.load(cvModelPath)
+        for param in [
+            lambda x: x.getNumFolds(),
+            lambda x: x.getFoldCol(),
+            lambda x: x.getSeed(),
+            lambda x: len(x.subModels)
+        ]:
+            self.assertEqual(param(cvModel), param(loadedCvModel))
+
+        self.assertTrue(all(
+            loadedCvModel.isSet(param) for param in loadedCvModel.params
+        ))
+
     def test_save_load_simple_estimator(self):
         temp_path = tempfile.mkdtemp()
         dataset = self.spark.createDataFrame(
@@ -451,16 +509,35 @@ class TrainValidationSplitTests(SparkSessionTestCase):
         lr = LogisticRegression()
         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
         evaluator = BinaryClassificationEvaluator()
-        tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, 
evaluator=evaluator)
+        tvs = TrainValidationSplit(
+            estimator=lr,
+            estimatorParamMaps=grid,
+            evaluator=evaluator,
+            collectSubModels=True,
+            seed=42
+        )
         tvsModel = tvs.fit(dataset)
         lrModel = tvsModel.bestModel
 
-        tvsModelPath = temp_path + "/tvsModel"
-        lrModel.save(tvsModelPath)
-        loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
+        lrModelPath = temp_path + "/lrModel"
+        lrModel.save(lrModelPath)
+        loadedLrModel = LogisticRegressionModel.load(lrModelPath)
         self.assertEqual(loadedLrModel.uid, lrModel.uid)
         self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
 
+        tvsModelPath = temp_path + "/tvsModel"
+        tvsModel.save(tvsModelPath)
+        loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath)
+        for param in [
+            lambda x: x.getSeed(),
+            lambda x: x.getTrainRatio(),
+        ]:
+            self.assertEqual(param(tvsModel), param(loadedTvsModel))
+
+        self.assertTrue(all(
+            loadedTvsModel.isSet(param) for param in loadedTvsModel.params
+        ))
+
     def test_save_load_simple_estimator(self):
         # This tests saving and loading the trained model only.
         # Save/load for TrainValidationSplit will be added later: SPARK-13786
@@ -662,11 +739,30 @@ class TrainValidationSplitTests(SparkSessionTestCase):
         grid = ParamGridBuilder() \
             .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
             .build()
-        tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, 
evaluator=evaluator)
+        tvs = TrainValidationSplit(
+            estimator=iee,
+            estimatorParamMaps=grid,
+            evaluator=evaluator,
+            collectSubModels=True
+        )
         tvsModel = tvs.fit(dataset)
         tvsCopied = tvs.copy()
         tvsModelCopied = tvsModel.copy()
 
+        for param in [
+            lambda x: x.getCollectSubModels(),
+            lambda x: x.getParallelism(),
+            lambda x: x.getSeed(),
+            lambda x: x.getTrainRatio(),
+        ]:
+            self.assertEqual(param(tvs), param(tvsCopied))
+
+        for param in [
+            lambda x: x.getSeed(),
+            lambda x: x.getTrainRatio(),
+        ]:
+            self.assertEqual(param(tvsModel), param(tvsModelCopied))
+
         self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
                          "Copied TrainValidationSplit has the same uid of 
Estimator")
 
@@ -678,6 +774,19 @@ class TrainValidationSplitTests(SparkSessionTestCase):
             self.assertEqual(tvsModel.validationMetrics[index],
                              tvsModelCopied.validationMetrics[index])
 
+        tvsModel.validationMetrics[0] = 'foo'
+        self.assertNotEqual(
+            tvsModelCopied.validationMetrics[0],
+            'foo',
+            "Changing the original validationMetrics should not affect the 
copied model"
+        )
+        tvsModel.subModels[0] = 'foo'
+        self.assertNotEqual(
+            tvsModelCopied.subModels[0],
+            'foo',
+            "Changing the original subModels should not affect the copied 
model"
+        )
+
 
 if __name__ == "__main__":
     from pyspark.ml.tests.test_tuning import *
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index e564ff7..91f34ef 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -479,9 +479,9 @@ class CrossValidatorModel(Model, _CrossValidatorParams, 
MLReadable, MLWritable):
         if extra is None:
             extra = dict()
         bestModel = self.bestModel.copy(extra)
-        avgMetrics = self.avgMetrics
-        subModels = self.subModels
-        return CrossValidatorModel(bestModel, avgMetrics, subModels)
+        avgMetrics = list(self.avgMetrics)
+        subModels = [model.copy() for model in self.subModels]
+        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, 
subModels), extra=extra)
 
     @since("2.3.0")
     def write(self):
@@ -505,8 +505,17 @@ class CrossValidatorModel(Model, _CrossValidatorParams, 
MLReadable, MLWritable):
         avgMetrics = _java2py(sc, java_stage.avgMetrics())
         estimator, epms, evaluator = super(CrossValidatorModel, 
cls)._from_java_impl(java_stage)
 
-        py_stage = cls(bestModel=bestModel, 
avgMetrics=avgMetrics)._set(estimator=estimator)
-        py_stage = 
py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
+        py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)
+        params = {
+            "evaluator": evaluator,
+            "estimator": estimator,
+            "estimatorParamMaps": epms,
+            "numFolds": java_stage.getNumFolds(),
+            "foldCol": java_stage.getFoldCol(),
+            "seed": java_stage.getSeed(),
+        }
+        for param_name, param_val in params.items():
+            py_stage = py_stage._set(**{param_name: param_val})
 
         if java_stage.hasSubModels():
             py_stage.subModels = [[JavaParams._from_java(sub_model)
@@ -530,9 +539,18 @@ class CrossValidatorModel(Model, _CrossValidatorParams, 
MLReadable, MLWritable):
                                              _py2java(sc, self.avgMetrics))
         estimator, epms, evaluator = super(CrossValidatorModel, 
self)._to_java_impl()
 
-        _java_obj.set("evaluator", evaluator)
-        _java_obj.set("estimator", estimator)
-        _java_obj.set("estimatorParamMaps", epms)
+        params = {
+            "evaluator": evaluator,
+            "estimator": estimator,
+            "estimatorParamMaps": epms,
+            "numFolds": self.getNumFolds(),
+            "foldCol": self.getFoldCol(),
+            "seed": self.getSeed(),
+        }
+        for param_name, param_val in params.items():
+            java_param = _java_obj.getParam(param_name)
+            pair = java_param.w(param_val)
+            _java_obj.set(pair)
 
         if self.subModels is not None:
             java_sub_models = [[sub_model._to_java() for sub_model in 
fold_sub_models]
@@ -818,8 +836,11 @@ class TrainValidationSplitModel(Model, 
_TrainValidationSplitParams, MLReadable,
             extra = dict()
         bestModel = self.bestModel.copy(extra)
         validationMetrics = list(self.validationMetrics)
-        subModels = self.subModels
-        return TrainValidationSplitModel(bestModel, validationMetrics, 
subModels)
+        subModels = [model.copy() for model in self.subModels]
+        return self._copyValues(
+            TrainValidationSplitModel(bestModel, validationMetrics, subModels),
+            extra=extra
+        )
 
     @since("2.3.0")
     def write(self):
@@ -847,8 +868,16 @@ class TrainValidationSplitModel(Model, 
_TrainValidationSplitParams, MLReadable,
                                            cls)._from_java_impl(java_stage)
         # Create a new instance of this stage.
         py_stage = cls(bestModel=bestModel,
-                       
validationMetrics=validationMetrics)._set(estimator=estimator)
-        py_stage = 
py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
+                       validationMetrics=validationMetrics)
+        params = {
+            "evaluator": evaluator,
+            "estimator": estimator,
+            "estimatorParamMaps": epms,
+            "trainRatio": java_stage.getTrainRatio(),
+            "seed": java_stage.getSeed(),
+        }
+        for param_name, param_val in params.items():
+            py_stage = py_stage._set(**{param_name: param_val})
 
         if java_stage.hasSubModels():
             py_stage.subModels = [JavaParams._from_java(sub_model)
@@ -871,9 +900,17 @@ class TrainValidationSplitModel(Model, 
_TrainValidationSplitParams, MLReadable,
             _py2java(sc, self.validationMetrics))
         estimator, epms, evaluator = super(TrainValidationSplitModel, 
self)._to_java_impl()
 
-        _java_obj.set("evaluator", evaluator)
-        _java_obj.set("estimator", estimator)
-        _java_obj.set("estimatorParamMaps", epms)
+        params = {
+            "evaluator": evaluator,
+            "estimator": estimator,
+            "estimatorParamMaps": epms,
+            "trainRatio": self.getTrainRatio(),
+            "seed": self.getSeed(),
+        }
+        for param_name, param_val in params.items():
+            java_param = _java_obj.getParam(param_name)
+            pair = java_param.w(param_val)
+            _java_obj.set(pair)
 
         if self.subModels is not None:
             java_sub_models = [sub_model._to_java() for sub_model in 
self.subModels]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to