Github user MrBago commented on a diff in the pull request: https://github.com/apache/spark/pull/18888#discussion_r132561786 --- Diff: python/pyspark/ml/pipeline.py --- @@ -242,3 +327,65 @@ def _to_java(self): JavaParams._new_java_obj("org.apache.spark.ml.PipelineModel", self.uid, java_stages) return _java_obj + + +@inherit_doc +class SharedReadWrite(): + """ + Functions for :py:class:`MLReader` and :py:class:`MLWriter` shared between + :py:class:'Pipeline' and :py:class'PipelineModel' + + .. versionadded:: 2.3.0 + """ + + @staticmethod + def validateStages(stages): + """ + Check that all stages are Writable + """ + for stage in stages: + if not isinstance(stage, MLWritable): + raise ValueError("Pipeline write will fail on this pipline " + + "because stage %s of type %s is not MLWritable", + stage.uid, type(stage)) + + @staticmethod + def saveImpl(instance, stages, sc, path): + """ + Save metadata and stages for a :py:class:`Pipeline` or :py:class:`PipelineModel` + - save metadata to path/metadata + - save stages to stages/IDX_UID + """ + stageUids = [stage.uid for stage in stages] + jsonParams = {'stageUids': stageUids, 'savedAsPython': True} + DefaultParamsWriter.saveMetadata(instance, path, sc, paramMap=jsonParams) + stagesDir = os.path.join(path, "stages") + for index, stage in enumerate(stages): + stage.write().save(SharedReadWrite + .getStagePath(stage.uid, index, len(stages), stagesDir)) + + @staticmethod + def load(metadata, sc, path): + """ + Load metadata and stages for a :py:class:`Pipeline` or :py:class:`PipelineModel` + + :return: (UID, list of stages) + """ + stagesDir = os.path.join(path, "stages") + stageUids = metadata['paramMap']['stageUids'] + stages = [] + for index, stageUid in enumerate(stageUids): + stagePath = SharedReadWrite.getStagePath(stageUid, index, len(stageUids), stagesDir) + stage = DefaultParamsReader.loadParamsInstance(stagePath, sc) + stages.append(stage) + return (metadata['uid'], stages) + + @staticmethod + def getStagePath(stageUid, stageIdx, numStages, stagesDir): --- End diff -- `stageIdx` isn't used by this method, is that intentional?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org