Marcello Leida created SPARK-32048: -------------------------------------- Summary: PySpark: error in serializing ML pipelines with training strategy and pipeline as estimator Key: SPARK-32048 URL: https://issues.apache.org/jira/browse/SPARK-32048 Project: Spark Issue Type: Bug Components: ML, PySpark Affects Versions: 2.4.5 Reporter: Marcello Leida
Hi all, I get the following error when serializing a pipeline with a CrossValidation and/or TrainValidationSplit training strategy and an estimator of type Pipeline through pyspark: {code:java} AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java {code} In scala the serialization works without problems, so i assume the issue should be in pyspark In case of using the LinearRegression as estimator the serialization is working properly. I see that in the tests of CrossValidation and TrainValidatioSplit, there is not a test with Pipeline as an estimator. I do not know if there is a workaround for this or another way to serialize the pipeline Code for replicating the issue: {code:java} from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit# Prepare training documents from a list of (id, text, label) tuples. df = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 3.0) ], ["id", "text", "label"])# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. lr = LogisticRegression(maxIter=10, regParam=0.001) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=1000) #treeClassifier = DecisionTreeClassifier()sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF])paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.1, 0.01]) \ .build()pipeline_cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) cvPath = "/tmp/cv" pipeline_cv.write().overwrite().save(cvPath) model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df)) model.write().overwrite().save(cvPath) pipeline_cv2 = CrossValidator(estimator=sub_pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) cvPath = "/tmp/cv2" model2 = pipeline_cv2.fit(df).bestModel model2.write().overwrite().save(cvPath) pipeline_cv2.write().overwrite().save(cvPath) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org