This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 85c9e8c [SPARK-32092][ML][PYSPARK] Fix parameters not being copied in CrossValidatorModel.copy(), read() and write() 85c9e8c is described below commit 85c9e8c54c30b69c39075e97cd3cac295be09303 Author: Louiszr <zxhs...@gmail.com> AuthorDate: Sat Aug 22 09:27:31 2020 -0500 [SPARK-32092][ML][PYSPARK] Fix parameters not being copied in CrossValidatorModel.copy(), read() and write() ### What changes were proposed in this pull request? Changed the definitions of `CrossValidatorModel.copy()/_to_java()/_from_java()` so that exposed parameters (i.e. parameters with `get()` methods) are copied in these methods. ### Why are the changes needed? Parameters are copied in the respective Scala interface for `CrossValidatorModel.copy()`. It fits the semantics to persist parameters when calling `CrossValidatorModel.save()` and `CrossValidatorModel.load()` so that the user gets the same model by saving and loading it after. Not copying across `numFolds` also causes bugs like Array index out of bound and losing sub-models because this parameters will always default to 3 (as described in the JIRA ticket). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests for `CrossValidatorModel.copy()` and `save()`/`load()` are updated so that they check parameters before and after function calls. Closes #29445 from Louiszr/master. Authored-by: Louiszr <zxhs...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit d9eb06ea37cab185f1e49c641313be9707270252) Signed-off-by: Sean Owen <sro...@gmail.com> --- python/pyspark/ml/tests/test_tuning.py | 131 ++++++++++++++++++++++++++++++--- python/pyspark/ml/tuning.py | 67 +++++++++++++---- 2 files changed, 172 insertions(+), 26 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 6bcc3f9..b250740 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -89,15 +89,50 @@ class CrossValidatorTests(SparkSessionTestCase): grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) - cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + cv = CrossValidator( + estimator=iee, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True, + numFolds=2 + ) cvCopied = cv.copy() - self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid) + for param in [ + lambda x: x.getEstimator().uid, + # SPARK-32092: CrossValidator.copy() needs to copy all existing params + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getCollectSubModels(), + lambda x: x.getParallelism(), + lambda x: x.getSeed() + ]: + self.assertEqual(param(cv), param(cvCopied)) cvModel = cv.fit(dataset) cvModelCopied = cvModel.copy() for index in range(len(cvModel.avgMetrics)): self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index]) < 0.0001) + # SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params + for param in [ + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getSeed() + ]: + self.assertEqual(param(cvModel), param(cvModelCopied)) + + cvModel.avgMetrics[0] = 'foo' + self.assertNotEqual( + cvModelCopied.avgMetrics[0], + 'foo', + "Changing the original avgMetrics should not affect the copied model" + ) + cvModel.subModels[0] = 'foo' + self.assertNotEqual( + cvModelCopied.subModels[0], + 'foo', + "Changing the original subModels should not affect the copied model" + ) def test_fit_minimize_metric(self): dataset = self.spark.createDataFrame([ @@ -166,16 +201,39 @@ class CrossValidatorTests(SparkSessionTestCase): lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() evaluator = BinaryClassificationEvaluator() - cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + cv = CrossValidator( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True, + numFolds=4, + seed=42 + ) cvModel = cv.fit(dataset) lrModel = cvModel.bestModel - cvModelPath = temp_path + "/cvModel" - lrModel.save(cvModelPath) - loadedLrModel = LogisticRegressionModel.load(cvModelPath) + lrModelPath = temp_path + "/lrModel" + lrModel.save(lrModelPath) + loadedLrModel = LogisticRegressionModel.load(lrModelPath) self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + # SPARK-32092: Saving and then loading CrossValidatorModel should not change the params + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedCvModel = CrossValidatorModel.load(cvModelPath) + for param in [ + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getSeed(), + lambda x: len(x.subModels) + ]: + self.assertEqual(param(cvModel), param(loadedCvModel)) + + self.assertTrue(all( + loadedCvModel.isSet(param) for param in loadedCvModel.params + )) + def test_save_load_simple_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( @@ -451,16 +509,35 @@ class TrainValidationSplitTests(SparkSessionTestCase): lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() evaluator = BinaryClassificationEvaluator() - tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + tvs = TrainValidationSplit( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True, + seed=42 + ) tvsModel = tvs.fit(dataset) lrModel = tvsModel.bestModel - tvsModelPath = temp_path + "/tvsModel" - lrModel.save(tvsModelPath) - loadedLrModel = LogisticRegressionModel.load(tvsModelPath) + lrModelPath = temp_path + "/lrModel" + lrModel.save(lrModelPath) + loadedLrModel = LogisticRegressionModel.load(lrModelPath) self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath) + for param in [ + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvsModel), param(loadedTvsModel)) + + self.assertTrue(all( + loadedTvsModel.isSet(param) for param in loadedTvsModel.params + )) + def test_save_load_simple_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 @@ -662,11 +739,30 @@ class TrainValidationSplitTests(SparkSessionTestCase): grid = ParamGridBuilder() \ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \ .build() - tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + tvs = TrainValidationSplit( + estimator=iee, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True + ) tvsModel = tvs.fit(dataset) tvsCopied = tvs.copy() tvsModelCopied = tvsModel.copy() + for param in [ + lambda x: x.getCollectSubModels(), + lambda x: x.getParallelism(), + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvs), param(tvsCopied)) + + for param in [ + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvsModel), param(tvsModelCopied)) + self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid, "Copied TrainValidationSplit has the same uid of Estimator") @@ -678,6 +774,19 @@ class TrainValidationSplitTests(SparkSessionTestCase): self.assertEqual(tvsModel.validationMetrics[index], tvsModelCopied.validationMetrics[index]) + tvsModel.validationMetrics[0] = 'foo' + self.assertNotEqual( + tvsModelCopied.validationMetrics[0], + 'foo', + "Changing the original validationMetrics should not affect the copied model" + ) + tvsModel.subModels[0] = 'foo' + self.assertNotEqual( + tvsModelCopied.subModels[0], + 'foo', + "Changing the original subModels should not affect the copied model" + ) + if __name__ == "__main__": from pyspark.ml.tests.test_tuning import * diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index e564ff7..91f34ef 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -479,9 +479,9 @@ class CrossValidatorModel(Model, _CrossValidatorParams, MLReadable, MLWritable): if extra is None: extra = dict() bestModel = self.bestModel.copy(extra) - avgMetrics = self.avgMetrics - subModels = self.subModels - return CrossValidatorModel(bestModel, avgMetrics, subModels) + avgMetrics = list(self.avgMetrics) + subModels = [model.copy() for model in self.subModels] + return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra) @since("2.3.0") def write(self): @@ -505,8 +505,17 @@ class CrossValidatorModel(Model, _CrossValidatorParams, MLReadable, MLWritable): avgMetrics = _java2py(sc, java_stage.avgMetrics()) estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage) - py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)._set(estimator=estimator) - py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator) + py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "numFolds": java_stage.getNumFolds(), + "foldCol": java_stage.getFoldCol(), + "seed": java_stage.getSeed(), + } + for param_name, param_val in params.items(): + py_stage = py_stage._set(**{param_name: param_val}) if java_stage.hasSubModels(): py_stage.subModels = [[JavaParams._from_java(sub_model) @@ -530,9 +539,18 @@ class CrossValidatorModel(Model, _CrossValidatorParams, MLReadable, MLWritable): _py2java(sc, self.avgMetrics)) estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl() - _java_obj.set("evaluator", evaluator) - _java_obj.set("estimator", estimator) - _java_obj.set("estimatorParamMaps", epms) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "numFolds": self.getNumFolds(), + "foldCol": self.getFoldCol(), + "seed": self.getSeed(), + } + for param_name, param_val in params.items(): + java_param = _java_obj.getParam(param_name) + pair = java_param.w(param_val) + _java_obj.set(pair) if self.subModels is not None: java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models] @@ -818,8 +836,11 @@ class TrainValidationSplitModel(Model, _TrainValidationSplitParams, MLReadable, extra = dict() bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) - subModels = self.subModels - return TrainValidationSplitModel(bestModel, validationMetrics, subModels) + subModels = [model.copy() for model in self.subModels] + return self._copyValues( + TrainValidationSplitModel(bestModel, validationMetrics, subModels), + extra=extra + ) @since("2.3.0") def write(self): @@ -847,8 +868,16 @@ class TrainValidationSplitModel(Model, _TrainValidationSplitParams, MLReadable, cls)._from_java_impl(java_stage) # Create a new instance of this stage. py_stage = cls(bestModel=bestModel, - validationMetrics=validationMetrics)._set(estimator=estimator) - py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator) + validationMetrics=validationMetrics) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "trainRatio": java_stage.getTrainRatio(), + "seed": java_stage.getSeed(), + } + for param_name, param_val in params.items(): + py_stage = py_stage._set(**{param_name: param_val}) if java_stage.hasSubModels(): py_stage.subModels = [JavaParams._from_java(sub_model) @@ -871,9 +900,17 @@ class TrainValidationSplitModel(Model, _TrainValidationSplitParams, MLReadable, _py2java(sc, self.validationMetrics)) estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl() - _java_obj.set("evaluator", evaluator) - _java_obj.set("estimator", estimator) - _java_obj.set("estimatorParamMaps", epms) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "trainRatio": self.getTrainRatio(), + "seed": self.getSeed(), + } + for param_name, param_val in params.items(): + java_param = _java_obj.getParam(param_name) + pair = java_param.w(param_val) + _java_obj.set(pair) if self.subModels is not None: java_sub_models = [sub_model._to_java() for sub_model in self.subModels] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org