This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6c0c226d901 [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect 6c0c226d901 is described below commit 6c0c226d90192e54a4965b6d69905936619e20d6 Author: Weichen Xu <weichen...@databricks.com> AuthorDate: Mon Jun 19 21:36:21 2023 +0800 [SPARK-43982][ML][PYTHON][CONNECT] Implement pipeline estimator for ML on spark connect ### What changes were proposed in this pull request? Implement pipeline estimator for ML on spark connect ### Why are the changes needed? See Distributed ML <> spark connect project design doc: https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.x8uc4xogrzbk ### Does this PR introduce _any_ user-facing change? Yes. New estimator `pyspark.mlv2.pipeline.Pipeline` is added. ### How was this patch tested? Unit tests. Closes #41479 from WeichenXu123/mlv2-pipeline. Authored-by: Weichen Xu <weichen...@databricks.com> Signed-off-by: Weichen Xu <weichen...@databricks.com> --- python/pyspark/mlv2/__init__.py | 4 + python/pyspark/mlv2/classification.py | 6 +- python/pyspark/mlv2/feature.py | 6 +- python/pyspark/mlv2/io_utils.py | 187 ++++++++++++++-------- python/pyspark/mlv2/pipeline.py | 241 +++++++++++++++++++++++++++++ python/pyspark/mlv2/tests/test_pipeline.py | 184 ++++++++++++++++++++++ 6 files changed, 561 insertions(+), 67 deletions(-) diff --git a/python/pyspark/mlv2/__init__.py b/python/pyspark/mlv2/__init__.py index 990b4fa9da8..352d24baabe 100644 --- a/python/pyspark/mlv2/__init__.py +++ b/python/pyspark/mlv2/__init__.py @@ -26,6 +26,8 @@ from pyspark.mlv2 import ( evaluation, ) +from pyspark.mlv2.pipeline import Pipeline, PipelineModel + __all__ = [ "Estimator", "Transformer", @@ -33,4 +35,6 @@ __all__ = [ "Model", "feature", "evaluation", + "Pipeline", + "PipelineModel", ] diff --git a/python/pyspark/mlv2/classification.py b/python/pyspark/mlv2/classification.py index fe0d76837f9..522c54b5289 100644 --- a/python/pyspark/mlv2/classification.py +++ b/python/pyspark/mlv2/classification.py @@ -40,7 +40,7 @@ from pyspark.ml.param.shared import ( HasMomentum, ) from pyspark.mlv2.base import Predictor, PredictionModel -from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite +from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite from pyspark.sql.functions import lit, count, countDistinct import torch @@ -253,7 +253,9 @@ class LogisticRegression( @inherit_doc -class LogisticRegressionModel(PredictionModel, _LogisticRegressionParams, ModelReadWrite): +class LogisticRegressionModel( + PredictionModel, _LogisticRegressionParams, ParamsReadWrite, CoreModelReadWrite +): """ Model fitted by LogisticRegression. diff --git a/python/pyspark/mlv2/feature.py b/python/pyspark/mlv2/feature.py index 57c6213d2bb..a58f214711c 100644 --- a/python/pyspark/mlv2/feature.py +++ b/python/pyspark/mlv2/feature.py @@ -24,7 +24,7 @@ from pyspark import keyword_only from pyspark.sql import DataFrame from pyspark.ml.param.shared import HasInputCol, HasOutputCol from pyspark.mlv2.base import Estimator, Model -from pyspark.mlv2.io_utils import ParamsReadWrite, ModelReadWrite +from pyspark.mlv2.io_utils import ParamsReadWrite, CoreModelReadWrite from pyspark.mlv2.summarizer import summarize_dataframe @@ -61,7 +61,7 @@ class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): return self._copyValues(model) -class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite): +class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite): def __init__( self, max_abs_values: Optional["np.ndarray"] = None, n_samples_seen: Optional[int] = None ) -> None: @@ -143,7 +143,7 @@ class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): return self._copyValues(model) -class StandardScalerModel(Model, HasInputCol, HasOutputCol, ModelReadWrite): +class StandardScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite): def __init__( self, mean_values: Optional["np.ndarray"] = None, diff --git a/python/pyspark/mlv2/io_utils.py b/python/pyspark/mlv2/io_utils.py index 8f7263206a7..c701736712f 100644 --- a/python/pyspark/mlv2/io_utils.py +++ b/python/pyspark/mlv2/io_utils.py @@ -21,7 +21,8 @@ import os import tempfile import time from urllib.parse import urlparse -from typing import Any, Dict, Optional +from typing import Any, Dict, List +from pyspark.ml.base import Params from pyspark.ml.util import _get_active_session from pyspark.sql.utils import is_remote @@ -56,43 +57,6 @@ def _copy_dir_from_local_to_fs(local_path: str, dest_path: str) -> None: _copy_file_from_local_to_fs(file_path, dest_file_path) -def _get_metadata_to_save( - instance: Any, - extra_metadata: Optional[Dict[str, Any]] = None, -) -> Dict[str, Any]: - """ - Extract metadata of Estimator / Transformer / Model / Evaluator instance. - """ - uid = instance.uid - cls = instance.__module__ + "." + instance.__class__.__name__ - - # User-supplied param values - params = instance._paramMap - json_params = {} - for p in params: - json_params[p.name] = params[p] - - # Default param values - json_default_params = {} - for p in instance._defaultParamMap: - json_default_params[p.name] = instance._defaultParamMap[p] - - metadata = { - "class": cls, - "timestamp": int(round(time.time() * 1000)), - "sparkVersion": pyspark_version, - "uid": uid, - "paramMap": json_params, - "defaultParamMap": json_default_params, - "type": "spark_connect", - } - if extra_metadata is not None: - assert isinstance(extra_metadata, dict) - metadata["extra"] = extra_metadata - - return metadata - - def _get_class(clazz: str) -> Any: """ Loads Python class from its name. @@ -103,7 +67,7 @@ def _get_class(clazz: str) -> Any: return getattr(m, parts[-1]) -class ParamsReadWrite: +class ParamsReadWrite(Params): """ The base interface Estimator / Transformer / Model / Evaluator needs to inherit for supporting saving and loading. @@ -115,18 +79,72 @@ class ParamsReadWrite: """ return None + def _get_skip_saving_params(self) -> List[str]: + """ + Returns params to be skipped when saving metadata. + """ + return [] + + def _get_metadata_to_save(self) -> Dict[str, Any]: + """ + Extract metadata of Estimator / Transformer / Model / Evaluator instance. + """ + extra_metadata = self._get_extra_metadata() + skipped_params = self._get_skip_saving_params() + + uid = self.uid + cls = self.__module__ + "." + self.__class__.__name__ + + # User-supplied param values + params = self._paramMap + json_params = {} + skipped_params = skipped_params or [] + for p in params: + if p.name not in skipped_params: + json_params[p.name] = params[p] + + # Default param values + json_default_params = {} + for p in self._defaultParamMap: + json_default_params[p.name] = self._defaultParamMap[p] + + metadata = { + "class": cls, + "timestamp": int(round(time.time() * 1000)), + "sparkVersion": pyspark_version, + "uid": uid, + "paramMap": json_params, + "defaultParamMap": json_default_params, + "type": "spark_connect", + } + if extra_metadata is not None: + assert isinstance(extra_metadata, dict) + metadata["extra"] = extra_metadata + + return metadata + def _load_extra_metadata(self, metadata: Dict[str, Any]) -> None: """ Load extra metadata attribute from metadata json object. """ pass + def _save_to_local(self, path: str) -> None: + metadata = self._get_metadata_to_save() + if isinstance(self, CoreModelReadWrite): + core_model_path = self._get_core_model_filename() + self._save_core_model(os.path.join(path, core_model_path)) + metadata["core_model_path"] = core_model_path + + with open(os.path.join(path, _META_DATA_FILE_NAME), "w") as fp: + json.dump(metadata, fp) + def saveToLocal(self, path: str, *, overwrite: bool = False) -> None: """ - Save model to provided local path. - """ - metadata = _get_metadata_to_save(self, extra_metadata=self._get_extra_metadata()) + Save Estimator / Transformer / Model / Evaluator to provided local path. + .. versionadded:: 3.5.0 + """ if os.path.exists(path): if overwrite: if os.path.isdir(path): @@ -137,19 +155,15 @@ class ParamsReadWrite: raise ValueError(f"The path {path} already exists.") os.makedirs(path) - with open(os.path.join(path, _META_DATA_FILE_NAME), "w") as fp: - json.dump(metadata, fp) + self._save_to_local(path) @classmethod - def loadFromLocal(cls, path: str) -> Any: - """ - Load model from provided local path. - """ - with open(os.path.join(path, _META_DATA_FILE_NAME), "r") as fp: - metadata = json.load(fp) - + def _load_from_metadata(cls, metadata: Dict[str, Any]) -> "Params": if "type" not in metadata or metadata["type"] != "spark_connect": - raise RuntimeError("The model is not saved by spark ML under spark connect mode.") + raise RuntimeError( + "The saved data is not saved by ML algorithm implemented in 'pyspark.ml.connect' " + "module." + ) class_name = metadata["class"] instance = _get_class(class_name)() @@ -169,7 +183,34 @@ class ParamsReadWrite: instance._load_extra_metadata(metadata["extra"]) return instance + @classmethod + def _load_from_local(cls, path: str) -> "Params": + with open(os.path.join(path, _META_DATA_FILE_NAME), "r") as fp: + metadata = json.load(fp) + + instance = cls._load_from_metadata(metadata) + + if isinstance(instance, CoreModelReadWrite): + core_model_path = metadata["core_model_path"] + instance._load_core_model(os.path.join(path, core_model_path)) + + return instance + + @classmethod + def loadFromLocal(cls, path: str) -> "Params": + """ + Load Estimator / Transformer / Model / Evaluator from provided local path. + + .. versionadded:: 3.5.0 + """ + return cls._load_from_local(path) + def save(self, path: str, *, overwrite: bool = False) -> None: + """ + Save Estimator / Transformer / Model / Evaluator to provided cloud storage path. + + .. versionadded:: 3.5.0 + """ session = _get_active_session(is_remote()) path_exist = True try: @@ -186,13 +227,18 @@ class ParamsReadWrite: tmp_local_dir = tempfile.mkdtemp(prefix="pyspark_ml_model_") try: - self.saveToLocal(tmp_local_dir, overwrite=True) + self._save_to_local(tmp_local_dir) _copy_dir_from_local_to_fs(tmp_local_dir, path) finally: shutil.rmtree(tmp_local_dir, ignore_errors=True) @classmethod - def load(cls, path: str) -> Any: + def load(cls, path: str) -> "Params": + """ + Load Estimator / Transformer / Model / Evaluator from provided cloud storage path. + + .. versionadded:: 3.5.0 + """ session = _get_active_session(is_remote()) tmp_local_dir = tempfile.mkdtemp(prefix="pyspark_ml_model_") @@ -205,12 +251,12 @@ class ParamsReadWrite: with open(os.path.join(tmp_local_dir, file_name), "wb") as f: f.write(file_content) - return cls.loadFromLocal(tmp_local_dir) + return cls._load_from_local(tmp_local_dir) finally: shutil.rmtree(tmp_local_dir, ignore_errors=True) -class ModelReadWrite(ParamsReadWrite): +class CoreModelReadWrite: def _get_core_model_filename(self) -> str: """ Returns the name of the file for saving the core model. @@ -231,12 +277,29 @@ class ModelReadWrite(ParamsReadWrite): """ raise NotImplementedError() - def saveToLocal(self, path: str, *, overwrite: bool = False) -> None: - super(ModelReadWrite, self).saveToLocal(path, overwrite=overwrite) - self._save_core_model(os.path.join(path, self._get_core_model_filename())) + +class MetaAlgorithmReadWrite(ParamsReadWrite): + """ + Meta-algorithm such as pipeline and cross validator must implement this interface. + """ + + def _save_meta_algorithm(self, root_path: str, node_path: List[str]) -> Dict[str, Any]: + raise NotImplementedError() + + def _load_meta_algorithm(self, root_path: str, node_metadata: Dict[str, Any]) -> None: + raise NotImplementedError() + + def _save_to_local(self, path: str) -> None: + metadata = self._save_meta_algorithm(path, []) + with open(os.path.join(path, _META_DATA_FILE_NAME), "w") as fp: + json.dump(metadata, fp) @classmethod - def loadFromLocal(cls, path: str) -> Any: - instance = super(ModelReadWrite, cls).loadFromLocal(path) - instance._load_core_model(os.path.join(path, instance._get_core_model_filename())) + def _load_from_local(cls, path: str) -> Any: + with open(os.path.join(path, _META_DATA_FILE_NAME), "r") as fp: + metadata = json.load(fp) + + instance = cls._load_from_metadata(metadata) + instance._load_meta_algorithm(path, metadata) # type: ignore[attr-defined] + return instance diff --git a/python/pyspark/mlv2/pipeline.py b/python/pyspark/mlv2/pipeline.py new file mode 100644 index 00000000000..81e0651f178 --- /dev/null +++ b/python/pyspark/mlv2/pipeline.py @@ -0,0 +1,241 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os + +import pandas as pd +from typing import Any, Dict, List, Optional, Union, cast, TYPE_CHECKING + +from pyspark import keyword_only, since +from pyspark.mlv2.base import Estimator, Model, Transformer +from pyspark.mlv2.io_utils import ParamsReadWrite, MetaAlgorithmReadWrite, CoreModelReadWrite +from pyspark.ml.param import Param, Params +from pyspark.ml.common import inherit_doc +from pyspark.sql.dataframe import DataFrame + + +if TYPE_CHECKING: + from pyspark.ml._typing import ParamMap + + +class _PipelineReadWrite(MetaAlgorithmReadWrite): + def _get_skip_saving_params(self) -> List[str]: + """ + Returns params to be skipped when saving metadata. + """ + return ["stages"] + + def _save_meta_algorithm(self, root_path: str, node_path: List[str]) -> Dict[str, Any]: + metadata = self._get_metadata_to_save() + metadata["stages"] = [] + + if isinstance(self, Pipeline): + stages = self.getStages() + elif isinstance(self, PipelineModel): + stages = self.stages + else: + raise ValueError() + + for stage_index, stage in enumerate(stages): + stage_name = f"pipeline_stage_{stage_index}" + node_path.append(stage_name) + if isinstance(stage, MetaAlgorithmReadWrite): + stage_metadata = stage._save_meta_algorithm(root_path, node_path) + else: + stage_metadata = stage._get_metadata_to_save() # type: ignore[attr-defined] + if isinstance(stage, CoreModelReadWrite): + core_model_path = ".".join(node_path + [stage._get_core_model_filename()]) + stage._save_core_model(os.path.join(root_path, core_model_path)) + stage_metadata["core_model_path"] = core_model_path + + metadata["stages"].append(stage_metadata) + node_path.pop() + return metadata + + def _load_meta_algorithm(self, root_path: str, node_metadata: Dict[str, Any]) -> None: + stages = [] + for stage_meta in node_metadata["stages"]: + stage = ParamsReadWrite._load_from_metadata(stage_meta) + + if isinstance(stage, MetaAlgorithmReadWrite): + stage._load_meta_algorithm(root_path, stage_meta) + + if isinstance(stage, CoreModelReadWrite): + core_model_path = stage_meta["core_model_path"] + stage._load_core_model(os.path.join(root_path, core_model_path)) + + stages.append(stage) + + if isinstance(self, Pipeline): + self.setStages(stages) + elif isinstance(self, PipelineModel): + self.stages = stages + else: + raise ValueError() + + +@inherit_doc +class Pipeline(Estimator["PipelineModel"], _PipelineReadWrite): + """ + A simple pipeline, which acts as an estimator. A Pipeline consists + of a sequence of stages, each of which is either an + :py:class:`Estimator` or a :py:class:`Transformer`. When + :py:meth:`Pipeline.fit` is called, the stages are executed in + order. If a stage is an :py:class:`Estimator`, its + :py:meth:`Estimator.fit` method will be called on the input + dataset to fit a model. Then the model, which is a transformer, + will be used to transform the dataset as the input to the next + stage. If a stage is a :py:class:`Transformer`, its + :py:meth:`Transformer.transform` method will be called to produce + the dataset for the next stage. The fitted model from a + :py:class:`Pipeline` is a :py:class:`PipelineModel`, which + consists of fitted models and transformers, corresponding to the + pipeline stages. If stages is an empty list, the pipeline acts as an + identity transformer. + + .. versionadded:: 3.5.0 + """ + + stages: Param[List[Params]] = Param( + Params._dummy(), "stages", "a list of pipeline stages" + ) # type: ignore[assignment] + + _input_kwargs: Dict[str, Any] + + @keyword_only + def __init__(self, *, stages: Optional[List[Params]] = None): + """ + __init__(self, \\*, stages=None) + """ + super(Pipeline, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + def setStages(self, value: List[Params]) -> "Pipeline": + """ + Set pipeline stages. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + value : list + of :py:class:`pyspark.mlv2.Transformer` + or :py:class:`pyspark.mlv2.Estimator` + + Returns + ------- + :py:class:`Pipeline` + the pipeline instance + """ + return self._set(stages=value) + + @since("3.5.0") + def getStages(self) -> List[Params]: + """ + Get pipeline stages. + """ + return self.getOrDefault(self.stages) + + @keyword_only + @since("3.5.0") + def setParams(self, *, stages: Optional[List[Params]] = None) -> "Pipeline": + """ + setParams(self, \\*, stages=None) + Sets params for Pipeline. + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "PipelineModel": + stages = self.getStages() + for stage in stages: + if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): + raise TypeError("Cannot recognize a pipeline stage of type %s." % type(stage)) + indexOfLastEstimator = -1 + for i, stage in enumerate(stages): + if isinstance(stage, Estimator): + indexOfLastEstimator = i + transformers: List[Transformer] = [] + for i, stage in enumerate(stages): + if i <= indexOfLastEstimator: + if isinstance(stage, Transformer): + transformers.append(stage) + dataset = stage.transform(dataset) + else: # must be an Estimator + model = stage.fit(dataset) # type: ignore[attr-defined] + transformers.append(model) + if i < indexOfLastEstimator: + dataset = model.transform(dataset) + else: + transformers.append(cast(Transformer, stage)) + pipeline_model = PipelineModel(transformers) # type: ignore[arg-type] + pipeline_model._resetUid(self.uid) + return pipeline_model + + def copy(self, extra: Optional["ParamMap"] = None) -> "Pipeline": + """ + Creates a copy of this instance. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + extra : dict, optional + extra parameters + + Returns + ------- + :py:class:`Pipeline` + new instance + """ + if extra is None: + extra = dict() + that = Params.copy(self, extra) + stages = [stage.copy(extra) for stage in that.getStages()] + return that.setStages(stages) + + +@inherit_doc +class PipelineModel(Model, _PipelineReadWrite): + """ + Represents a compiled pipeline with transformers and fitted models. + + .. versionadded:: 3.5.0 + """ + + def __init__(self, stages: Optional[List[Params]] = None): + super(PipelineModel, self).__init__() + self.stages = stages # type: ignore[assignment] + + def transform(self, dataset: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, pd.DataFrame]: + for t in self.stages: + dataset = t.transform(dataset) # type: ignore[attr-defined] + return dataset + + def copy(self, extra: Optional["ParamMap"] = None) -> "PipelineModel": + """ + Creates a copy of this instance. + + .. versionadded:: 3.5.0 + + :param extra: extra parameters + :returns: new instance + """ + if extra is None: + extra = dict() + stages = [stage.copy(extra) for stage in self.stages] + return PipelineModel(stages) diff --git a/python/pyspark/mlv2/tests/test_pipeline.py b/python/pyspark/mlv2/tests/test_pipeline.py new file mode 100644 index 00000000000..cec421b7ee0 --- /dev/null +++ b/python/pyspark/mlv2/tests/test_pipeline.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import tempfile +import unittest +import numpy as np +from pyspark.mlv2.feature import StandardScaler +from pyspark.mlv2.classification import LogisticRegression as LORV2 +from pyspark.mlv2.pipeline import Pipeline +from pyspark.sql import SparkSession + + +have_torch = True +try: + import torch # noqa: F401 +except ImportError: + have_torch = False + + +class PipelineTestsMixin: + @staticmethod + def _check_result(result_dataframe, expected_predictions, expected_probabilities=None): + np.testing.assert_array_equal(list(result_dataframe.prediction), expected_predictions) + if "probability" in result_dataframe.columns: + np.testing.assert_allclose( + list(result_dataframe.probability), + expected_probabilities, + rtol=1e-2, + ) + + def test_pipeline(self): + train_dataset = self.spark.createDataFrame( + [ + (1.0, [0.0, 5.0]), + (0.0, [1.0, 2.0]), + (1.0, [2.0, 1.0]), + (0.0, [3.0, 3.0]), + ] + * 100, + ["label", "features"], + ) + eval_dataset = self.spark.createDataFrame( + [ + ([0.0, 2.0],), + ([3.5, 3.0],), + ], + ["features"], + ) + scaler = StandardScaler(inputCol="features", outputCol="scaled_features") + lorv2 = LORV2( + maxIter=200, numTrainWorkers=2, learningRate=0.001, featuresCol="scaled_features" + ) + + pipeline = Pipeline(stages=[scaler, lorv2]) + model = pipeline.fit(train_dataset) + assert model.uid == pipeline.uid + + expected_predictions = [1, 0] + expected_probabilities = [ + [0.117658, 0.882342], + [0.878738, 0.121262], + ] + + result = model.transform(eval_dataset).toPandas() + self._check_result(result, expected_predictions, expected_probabilities) + local_transform_result = model.transform(eval_dataset.toPandas()) + self._check_result(local_transform_result, expected_predictions, expected_probabilities) + + pipeline2 = Pipeline(stages=[pipeline]) + model2 = pipeline2.fit(train_dataset) + result2 = model2.transform(eval_dataset).toPandas() + self._check_result(result2, expected_predictions, expected_probabilities) + local_transform_result2 = model2.transform(eval_dataset.toPandas()) + self._check_result(local_transform_result2, expected_predictions, expected_probabilities) + + with tempfile.TemporaryDirectory() as tmp_dir: + pipeline_local_path = os.path.join(tmp_dir, "pipeline") + pipeline.saveToLocal(pipeline_local_path) + loaded_pipeline = Pipeline.loadFromLocal(pipeline_local_path) + + assert pipeline.uid == loaded_pipeline.uid + assert loaded_pipeline.getStages()[1].getMaxIter() == 200 + + pipeline_model_local_path = os.path.join(tmp_dir, "pipeline_model") + model.saveToLocal(pipeline_model_local_path) + loaded_model = Pipeline.loadFromLocal(pipeline_model_local_path) + + assert model.uid == loaded_model.uid + assert loaded_model.stages[1].getMaxIter() == 200 + + loaded_model_transform_result = loaded_model.transform(eval_dataset).toPandas() + self._check_result( + loaded_model_transform_result, expected_predictions, expected_probabilities + ) + + pipeline2_local_path = os.path.join(tmp_dir, "pipeline2") + pipeline2.saveToLocal(pipeline2_local_path) + loaded_pipeline2 = Pipeline.loadFromLocal(pipeline2_local_path) + + assert pipeline2.uid == loaded_pipeline2.uid + assert loaded_pipeline2.getStages()[0].getStages()[1].getMaxIter() == 200 + + pipeline2_model_local_path = os.path.join(tmp_dir, "pipeline2_model") + model2.saveToLocal(pipeline2_model_local_path) + loaded_model2 = Pipeline.loadFromLocal(pipeline2_model_local_path) + + assert model2.uid == loaded_model2.uid + assert loaded_model2.stages[0].stages[1].getMaxIter() == 200 + + loaded_model2_transform_result = loaded_model2.transform(eval_dataset).toPandas() + self._check_result( + loaded_model2_transform_result, expected_predictions, expected_probabilities + ) + + @staticmethod + def test_pipeline_copy(): + scaler = StandardScaler(inputCol="features", outputCol="scaled_features") + lorv2 = LORV2( + maxIter=200, numTrainWorkers=2, learningRate=0.001, featuresCol="scaled_features" + ) + + pipeline = Pipeline(stages=[scaler, lorv2]) + + copied_pipeline = pipeline.copy( + {scaler.inputCol: "f1", lorv2.maxIter: 10, lorv2.numTrainWorkers: 1} + ) + + stages = copied_pipeline.getStages() + + assert stages[0].getInputCol() == "f1" + assert stages[1].getOrDefault(stages[1].maxIter) == 10 + assert stages[1].getOrDefault(stages[1].numTrainWorkers) == 1 + assert stages[1].getOrDefault(stages[1].featuresCol) == "scaled_features" + + pipeline2 = Pipeline(stages=[pipeline]) + copied_pipeline2 = pipeline2.copy( + {scaler.inputCol: "f2", lorv2.maxIter: 20, lorv2.numTrainWorkers: 20} + ) + + stages = copied_pipeline2.getStages()[0].getStages() + + assert stages[0].getInputCol() == "f2" + assert stages[1].getOrDefault(stages[1].maxIter) == 20 + assert stages[1].getOrDefault(stages[1].numTrainWorkers) == 20 + assert stages[1].getOrDefault(stages[1].featuresCol) == "scaled_features" + + # test original stage instance params are not modified after pipeline copying. + assert scaler.getInputCol() == "features" + assert lorv2.getOrDefault(lorv2.maxIter) == 200 + + +class PipelineTests(PipelineTestsMixin, unittest.TestCase): + def setUp(self) -> None: + self.spark = SparkSession.builder.master("local[2]").getOrCreate() + + def tearDown(self) -> None: + self.spark.stop() + + +if __name__ == "__main__": + from pyspark.mlv2.tests.test_pipeline import * # noqa: F401,F403 + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org