Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11683#discussion_r56217493
  
    --- Diff: python/pyspark/ml/pipeline.py ---
    @@ -15,116 +15,87 @@
     # limitations under the License.
     #
     
    -from abc import ABCMeta, abstractmethod
    +import sys
     
    +if sys.version > '3':
    +    basestring = str
    +
    +from pyspark import SparkContext
     from pyspark import since
    +from pyspark.ml import Estimator, Model, Transformer
     from pyspark.ml.param import Param, Params
    -from pyspark.ml.util import keyword_only
    +from pyspark.ml.util import keyword_only, JavaMLWriter, JavaMLReader
    +from pyspark.ml.wrapper import JavaWrapper
     from pyspark.mllib.common import inherit_doc
     
     
    -@inherit_doc
    -class Estimator(Params):
    +def _stages_java2py(java_stages):
         """
    -    Abstract class for estimators that fit models to data.
    -
    -    .. versionadded:: 1.3.0
    +    Transforms the parameter Python stages from a list of Java stages.
    +    :param java_stages: An array of Java stages.
    +    :return: An array of Python stages.
         """
     
    -    __metaclass__ = ABCMeta
    +    for stage in java_stages:
    +        assert(stage.getClass().getName() != "org.apache.spark.Pipeline" or
    +               stage.getClass().getName() != 
"org.apache.spark.PipelineModel",
    +               "Nested Pipeline and PipelineModel are not supported for 
save/load now.")
    +    return list(map(JavaWrapper._transfer_stage_from_java, java_stages))
     
    -    @abstractmethod
    -    def _fit(self, dataset):
    -        """
    -        Fits a model to the input dataset. This is called by the
    -        default implementation of fit.
     
    -        :param dataset: input dataset, which is an instance of
    -                        :py:class:`pyspark.sql.DataFrame`
    -        :returns: fitted model
    -        """
    -        raise NotImplementedError()
    +def _stages_py2java(py_stages):
    +    """
    +    Transforms the parameter of Python stages to a Java array of Java 
stages.
    +    :param py_stages: An array of Python stages.
    +    :return: A Java array of Java Stages.
    +    """
     
    -    @since("1.3.0")
    -    def fit(self, dataset, params=None):
    -        """
    -        Fits a model to the input dataset with optional parameters.
    -
    -        :param dataset: input dataset, which is an instance of
    -                        :py:class:`pyspark.sql.DataFrame`
    -        :param params: an optional param map that overrides embedded
    -                       params. If a list/tuple of param maps is given,
    -                       this calls fit on each param map and returns a
    -                       list of models.
    -        :returns: fitted model(s)
    -        """
    -        if params is None:
    -            params = dict()
    -        if isinstance(params, (list, tuple)):
    -            return [self.fit(dataset, paramMap) for paramMap in params]
    -        elif isinstance(params, dict):
    -            if params:
    -                return self.copy(params)._fit(dataset)
    -            else:
    -                return self._fit(dataset)
    -        else:
    -            raise ValueError("Params must be either a param map or a 
list/tuple of param maps, "
    -                             "but got %s." % type(params))
    +    for stage in py_stages:
    +        assert(isinstance(stage, JavaWrapper),
    +               "Nested Pipeline and PipelineModel are not supported for 
save/load now.")
    +    gateway = SparkContext._gateway
    +    jvm = SparkContext._jvm
    +    java_stages = gateway.new_array(jvm.org.apache.spark.ml.PipelineStage, 
len(py_stages))
    +    for idx, stage in enumerate(py_stages):
    +        java_stages[idx] = stage._transfer_stage_to_java()
    +    return java_stages
     
     
     @inherit_doc
    -class Transformer(Params):
    +class _PipelineMLWriter(JavaMLWriter, JavaWrapper):
         """
    -    Abstract class for transformers that transform one dataset into
    -    another.
    -
    -    .. versionadded:: 1.3.0
    +    Pipeline utility class that can save ML instances through their Scala 
implementation.
         """
     
    -    __metaclass__ = ABCMeta
    -
    -    @abstractmethod
    -    def _transform(self, dataset):
    -        """
    -        Transforms the input dataset.
    -
    -        :param dataset: input dataset, which is an instance of
    -                        :py:class:`pyspark.sql.DataFrame`
    -        :returns: transformed dataset
    -        """
    -        raise NotImplementedError()
    -
    -    @since("1.3.0")
    -    def transform(self, dataset, params=None):
    -        """
    -        Transforms the input dataset with optional parameters.
    -
    -        :param dataset: input dataset, which is an instance of
    -                        :py:class:`pyspark.sql.DataFrame`
    -        :param params: an optional param map that overrides embedded
    -                       params.
    -        :returns: transformed dataset
    -        """
    -        if params is None:
    -            params = dict()
    -        if isinstance(params, dict):
    -            if params:
    -                return self.copy(params,)._transform(dataset)
    -            else:
    -                return self._transform(dataset)
    -        else:
    -            raise ValueError("Params must be either a param map but got 
%s." % type(params))
    +    def __init__(self, instance):
    +        self._java_obj = 
self._new_java_obj("org.apache.spark.ml.Pipeline", instance.uid)
    +        jparam = self._java_obj.getParam(instance.stages.name)
    +        jstages = _stages_py2java(instance.getStages())
    +        self._java_obj.set(jparam.w(jstages))
    +        self._jwrite = self._java_obj.write()
     
     
     @inherit_doc
    -class Model(Transformer):
    +class _PipelineMLReader(JavaMLReader):
         """
    -    Abstract class for models that are fitted by estimators.
    -
    -    .. versionadded:: 1.4.0
    +    Utility class that can load Pipeline instances through their Scala 
implementation.
         """
     
    -    __metaclass__ = ABCMeta
    +    def load(self, path):
    +        """Load the Pipeline instance from the input path."""
    +        if not isinstance(path, basestring):
    +            raise TypeError("path should be a basestring, got type %s" % 
type(path))
    +
    +        sc = SparkContext._active_spark_context
    +        java_obj = self._jread.load(path)
    +        instance = self._clazz()
    +        instance._resetUid(java_obj.uid())
    +        jparam = java_obj.getParam(instance.stages.name)
    +        if java_obj.isDefined(jparam):
    +            java_stages = java_obj.getOrDefault(jparam)
    +            instance._paramMap[instance.stages] = 
_stages_java2py(java_stages)
    --- End diff --
    
    Call set; do not access paramMap directly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to