Github user ajaysaini725 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18888#discussion_r132597367
  
    --- Diff: python/pyspark/ml/pipeline.py ---
    @@ -242,3 +327,65 @@ def _to_java(self):
                 JavaParams._new_java_obj("org.apache.spark.ml.PipelineModel", 
self.uid, java_stages)
     
             return _java_obj
    +
    +
    +@inherit_doc
    +class SharedReadWrite():
    +    """
    +    Functions for :py:class:`MLReader` and :py:class:`MLWriter` shared 
between
    +    :py:class:'Pipeline' and :py:class'PipelineModel'
    +
    +    .. versionadded:: 2.3.0
    +    """
    +
    +    @staticmethod
    +    def validateStages(stages):
    +        """
    +        Check that all stages are Writable
    +        """
    +        for stage in stages:
    +            if not isinstance(stage, MLWritable):
    +                raise ValueError("Pipeline write will fail on this pipline 
" +
    +                                 "because stage %s of type %s is not 
MLWritable",
    +                                 stage.uid, type(stage))
    +
    +    @staticmethod
    +    def saveImpl(instance, stages, sc, path):
    +        """
    +        Save metadata and stages for a :py:class:`Pipeline` or 
:py:class:`PipelineModel`
    +        - save metadata to path/metadata
    +        - save stages to stages/IDX_UID
    +        """
    +        stageUids = [stage.uid for stage in stages]
    +        jsonParams = {'stageUids': stageUids, 'savedAsPython': True}
    +        DefaultParamsWriter.saveMetadata(instance, path, sc, 
paramMap=jsonParams)
    +        stagesDir = os.path.join(path, "stages")
    +        for index, stage in enumerate(stages):
    +            stage.write().save(SharedReadWrite
    +                               .getStagePath(stage.uid, index, 
len(stages), stagesDir))
    +
    +    @staticmethod
    +    def load(metadata, sc, path):
    +        """
    +        Load metadata and stages for a :py:class:`Pipeline` or 
:py:class:`PipelineModel`
    +
    +        :return:  (UID, list of stages)
    +        """
    +        stagesDir = os.path.join(path, "stages")
    +        stageUids = metadata['paramMap']['stageUids']
    +        stages = []
    +        for index, stageUid in enumerate(stageUids):
    +            stagePath = SharedReadWrite.getStagePath(stageUid, index, 
len(stageUids), stagesDir)
    +            stage = DefaultParamsReader.loadParamsInstance(stagePath, sc)
    +            stages.append(stage)
    +        return (metadata['uid'], stages)
    +
    +    @staticmethod
    +    def getStagePath(stageUid, stageIdx, numStages, stagesDir):
    --- End diff --
    
    It should be used. Fixed this. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to