[ 
https://issues.apache.org/jira/browse/SPARK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252726#comment-17252726
 ] 

Nicholas Brett Marcott commented on SPARK-32048:
------------------------------------------------

I could not reproduce the stated 'AttributeError' error with given code example 
using master code from [this 
commit.|https://github.com/apache/spark/tree/de234eec8febce99ede5ef9ae2301e36739a0f85]
 It appears this has been fixed in [this 
commit|https://github.com/WeichenXu123/spark/commit/58b0c797da95b0516285948d58e981a893b648f4]
 and backported to 3.0 in [this 
commit|https://github.com/apache/spark/commit/8acbe5b822b7b0ad49079aa223ad52afe70b5afa].

When ran with the commit I referenced, there is a different error in the 
example code since the dataframe references 4 labels, yet uses the 
BinaryClassificationEvaluator. Once I changed the label 3.0 to 0.0, the entire 
example code ran without error.

 
{code:java}
>>> model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df))                
20/12/21 17:43:33 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeSystemBLAS
20/12/21 17:43:33 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefBLAS
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/base.py", line 
161, in fit
    return self._fit(dataset)
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", 
line 687, in _fit
    for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
  File 
"/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py",
 line 868, in next
    raise value
  File 
"/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py",
 line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", 
line 687, in <lambda>
    for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", 
line 74, in singleTask
    metric = eva.evaluate(model.transform(validation, epm[index]))
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/evaluation.py", 
line 84, in evaluate
    return self._evaluate(dataset)
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/evaluation.py", 
line 120, in _evaluate
    return self._java_obj.evaluate(dataset._jdf)
  File 
"/Users/nmarcott/projects/apache/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1304, in __call__
  File "/Users/nmarcott/projects/apache/spark/python/pyspark/sql/utils.py", 
line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: requirement failed: 
rawPredictionCol vectors must have length=2, but got 4
{code}
 

I did verify the example code produces below error when run in v2.4.7, the 
previous stable spark release:

 
{code:java}
>>> pipeline_cv2.write().overwrite().save(cvPath)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py",
 line 342, in write
    return JavaMLWriter(self)
  File 
"/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/util.py",
 line 176, in __init__
    _java_obj = instance._to_java()
  File 
"/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py",
 line 376, in _to_java
    estimator, epms, evaluator = super(CrossValidator, self)._to_java_impl()
  File 
"/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py",
 line 195, in _to_java_impl
    java_epms[idx] = self.getEstimator()._transfer_param_map_to_java(epm)
AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java'
{code}
Saving works with LinearRegression (and most other classifiers) since it 
inherits JavaParams._transfer_param_map_to_java


[~weichenxu123] Are backports/fixes happening for older stable releases, or do 
we close this ticket?
 

> PySpark: error in serializing ML pipelines with training strategy and 
> pipeline as estimator
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32048
>                 URL: https://issues.apache.org/jira/browse/SPARK-32048
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, PySpark
>    Affects Versions: 2.4.5
>            Reporter: Marcello Leida
>            Priority: Major
>
> Hi all,
> I get the following error when serializing a pipeline with a CrossValidation 
> and/or TrainValidationSplit training strategy and an estimator of type 
> Pipeline through pyspark:
> {code:java}
> AttributeError: 'Pipeline' object has no attribute 
> '_transfer_param_map_to_java
> {code}
> In scala the serialization works without problems, so i assume the issue 
> should be in pyspark
> In case of using the LinearRegression as estimator the serialization is 
> working properly.
> I see that in the tests of CrossValidation and TrainValidatioSplit, there is 
> not a test with Pipeline as an estimator.
> I do not know if there is a workaround for this or another way to serialize 
> the pipeline, or if this is a known issue
> Code for replicating the issue:
> {code:java}
> from pyspark.ml import Pipeline
> from pyspark.ml.classification import LogisticRegression, 
> DecisionTreeClassifier
> from pyspark.ml.evaluation import BinaryClassificationEvaluator
> from pyspark.ml.feature import HashingTF, Tokenizer
> from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, 
> TrainValidationSplit
> # Prepare training documents from a list of (id, text, label) tuples.
> df = spark.createDataFrame([
>     (0, "a b c d e spark", 1.0),
>     (1, "b d", 0.0),
>     (2, "spark f g h", 1.0),
>     (3, "hadoop mapreduce", 3.0)
> ], ["id", "text", "label"])
> # Configure an ML pipeline, which consists of three stages: tokenizer, 
> hashingTF, and lr.
> lr = LogisticRegression(maxIter=10, regParam=0.001)
> tokenizer = Tokenizer(inputCol="text", outputCol="words")
> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
> outputCol="features", numFeatures=1000)
> #treeClassifier = DecisionTreeClassifier()
> sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
> sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF])
> paramGrid = ParamGridBuilder() \
>     .addGrid(lr.regParam, [0.1, 0.01]) \
>     .build()
> pipeline_cv = CrossValidator(estimator=lr,
>                           estimatorParamMaps=paramGrid,
>                           evaluator=BinaryClassificationEvaluator(),
>                           numFolds=2)
> cvPath = "/tmp/cv"
> pipeline_cv.write().overwrite().save(cvPath)
> model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df))
> model.write().overwrite().save(cvPath)
> pipeline_cv2 = CrossValidator(estimator=sub_pipeline,
>                           estimatorParamMaps=paramGrid,
>                           evaluator=BinaryClassificationEvaluator(),
>                           numFolds=2)
> cvPath = "/tmp/cv2"
> model2 = pipeline_cv2.fit(df).bestModel
> model2.write().overwrite().save(cvPath)
> pipeline_cv2.write().overwrite().save(cvPath)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to