[ https://issues.apache.org/jira/browse/SPARK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252726#comment-17252726 ]
Nicholas Brett Marcott commented on SPARK-32048: ------------------------------------------------ I could not reproduce the stated 'AttributeError' error with given code example using master code from [this commit.|https://github.com/apache/spark/tree/de234eec8febce99ede5ef9ae2301e36739a0f85] It appears this has been fixed in [this commit|https://github.com/WeichenXu123/spark/commit/58b0c797da95b0516285948d58e981a893b648f4] and backported to 3.0 in [this commit|https://github.com/apache/spark/commit/8acbe5b822b7b0ad49079aa223ad52afe70b5afa]. When ran with the commit I referenced, there is a different error in the example code since the dataframe references 4 labels, yet uses the BinaryClassificationEvaluator. Once I changed the label 3.0 to 0.0, the entire example code ran without error. {code:java} >>> model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df)) 20/12/21 17:43:33 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 20/12/21 17:43:33 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/base.py", line 161, in fit return self._fit(dataset) File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", line 687, in _fit for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks): File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 868, in next raise value File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, **kwds)) File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", line 687, in <lambda> for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks): File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/tuning.py", line 74, in singleTask metric = eva.evaluate(model.transform(validation, epm[index])) File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/evaluation.py", line 84, in evaluate return self._evaluate(dataset) File "/Users/nmarcott/projects/apache/spark/python/pyspark/ml/evaluation.py", line 120, in _evaluate return self._java_obj.evaluate(dataset._jdf) File "/Users/nmarcott/projects/apache/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__ File "/Users/nmarcott/projects/apache/spark/python/pyspark/sql/utils.py", line 117, in deco raise converted from None pyspark.sql.utils.IllegalArgumentException: requirement failed: rawPredictionCol vectors must have length=2, but got 4 {code} I did verify the example code produces below error when run in v2.4.7, the previous stable spark release: {code:java} >>> pipeline_cv2.write().overwrite().save(cvPath) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py", line 342, in write return JavaMLWriter(self) File "/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/util.py", line 176, in __init__ _java_obj = instance._to_java() File "/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py", line 376, in _to_java estimator, epms, evaluator = super(CrossValidator, self)._to_java_impl() File "/Users/nmarcott/Downloads/spark-2.4.7-bin-hadoop2.7/python/pyspark/ml/tuning.py", line 195, in _to_java_impl java_epms[idx] = self.getEstimator()._transfer_param_map_to_java(epm) AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java' {code} Saving works with LinearRegression (and most other classifiers) since it inherits JavaParams._transfer_param_map_to_java [~weichenxu123] Are backports/fixes happening for older stable releases, or do we close this ticket? > PySpark: error in serializing ML pipelines with training strategy and > pipeline as estimator > ------------------------------------------------------------------------------------------- > > Key: SPARK-32048 > URL: https://issues.apache.org/jira/browse/SPARK-32048 > Project: Spark > Issue Type: Bug > Components: ML, PySpark > Affects Versions: 2.4.5 > Reporter: Marcello Leida > Priority: Major > > Hi all, > I get the following error when serializing a pipeline with a CrossValidation > and/or TrainValidationSplit training strategy and an estimator of type > Pipeline through pyspark: > {code:java} > AttributeError: 'Pipeline' object has no attribute > '_transfer_param_map_to_java > {code} > In scala the serialization works without problems, so i assume the issue > should be in pyspark > In case of using the LinearRegression as estimator the serialization is > working properly. > I see that in the tests of CrossValidation and TrainValidatioSplit, there is > not a test with Pipeline as an estimator. > I do not know if there is a workaround for this or another way to serialize > the pipeline, or if this is a known issue > Code for replicating the issue: > {code:java} > from pyspark.ml import Pipeline > from pyspark.ml.classification import LogisticRegression, > DecisionTreeClassifier > from pyspark.ml.evaluation import BinaryClassificationEvaluator > from pyspark.ml.feature import HashingTF, Tokenizer > from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, > TrainValidationSplit > # Prepare training documents from a list of (id, text, label) tuples. > df = spark.createDataFrame([ > (0, "a b c d e spark", 1.0), > (1, "b d", 0.0), > (2, "spark f g h", 1.0), > (3, "hadoop mapreduce", 3.0) > ], ["id", "text", "label"]) > # Configure an ML pipeline, which consists of three stages: tokenizer, > hashingTF, and lr. > lr = LogisticRegression(maxIter=10, regParam=0.001) > tokenizer = Tokenizer(inputCol="text", outputCol="words") > hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), > outputCol="features", numFeatures=1000) > #treeClassifier = DecisionTreeClassifier() > sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) > sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF]) > paramGrid = ParamGridBuilder() \ > .addGrid(lr.regParam, [0.1, 0.01]) \ > .build() > pipeline_cv = CrossValidator(estimator=lr, > estimatorParamMaps=paramGrid, > evaluator=BinaryClassificationEvaluator(), > numFolds=2) > cvPath = "/tmp/cv" > pipeline_cv.write().overwrite().save(cvPath) > model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df)) > model.write().overwrite().save(cvPath) > pipeline_cv2 = CrossValidator(estimator=sub_pipeline, > estimatorParamMaps=paramGrid, > evaluator=BinaryClassificationEvaluator(), > numFolds=2) > cvPath = "/tmp/cv2" > model2 = pipeline_cv2.fit(df).bestModel > model2.write().overwrite().save(cvPath) > pipeline_cv2.write().overwrite().save(cvPath) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org