HyukjinKwon opened a new pull request, #39467: URL: https://github.com/apache/spark/pull/39467
### What changes were proposed in this pull request? After MLflow 2.1 release (https://github.com/mlflow/mlflow/releases), the unittest in pandas API on Spark is broken. ```python from mlflow.tracking import MlflowClient, set_tracking_uri from sklearn.linear_model import LinearRegression import mlflow.sklearn from tempfile import mkdtemp d = mkdtemp("pandas_on_spark_mlflow") set_tracking_uri("file:%s"%d) client = MlflowClient() exp_id = mlflow.create_experiment("my_experiment") exp = mlflow.set_experiment("my_experiment") train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2, "y": np.log(2 + np.arange(8))}) train_x = train[["x1", "x2"]] train_y = train[["y"]] with mlflow.start_run(): lr = LinearRegression() lr.fit(train_x, train_y) mlflow.sklearn.log_model(lr, "model") from pyspark.pandas.mlflow import load_model run_info = client.search_runs(exp_id)[-1].info model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_id)) prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]}) prediction_df["prediction"] = model.predict(prediction_df) print(prediction_df) ``` https://github.com/apache/spark/blob/06ec98b0d6a51e0c3ffec70e78d86d577b0e7a72/python/pyspark/pandas/mlflow.py#L134-L202 ``` File "/__w/spark/spark/python/pyspark/pandas/mlflow.py", line 172, in pyspark.pandas.mlflow.load_model Failed example: prediction_df Exception raised: Traceback (most recent call last): File "/usr/lib/python3.9/doctest.py", line 1336, in __run exec(compile(example.source, filename, "single", File "<doctest pyspark.pandas.mlflow.load_model[18]>", line 1, in <module> prediction_df File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13322, in __repr__ pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count)) File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13313, in _get_or_create_repr_pandas_cache self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()} File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13308, in _to_internal_pandas return self._internal.to_pandas_frame File "/__w/spark/spark/python/pyspark/pandas/utils.py", line 588, in wrapped_lazy_property setattr(self, attr_name, fn(self)) File "/__w/spark/spark/python/pyspark/pandas/internal.py", line 1056, in to_pandas_frame pdf = sdf.toPandas() File "/__w/spark/spark/python/pyspark/sql/pandas/conversion.py", line 208, in toPandas pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 1197, in collect sock_info = self._jdf.collectToPython() File "/__w/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/__w/spark/spark/python/pyspark/sql/utils.py", line 209, in deco raise converted from None pyspark.sql.utils.PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 829, in main process() File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 821, in process serializer.dump_stream(out_iter, outfile) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 345, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 86, in dump_stream for batch in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 338, in init_stream_yield_batches for series in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 519, in func for result_batch, result_type in result_iter: File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1253, in udf yield _predict_row_batch(batch_predict_fn, row_batch_args) File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1057, in _predict_row_batch result = predict_fn(pdf) File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1237, in batch_predict_fn return loaded_model.predict(pdf) File "/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 413, in predict return self._predict_fn(data) File "/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 355, in predict return self._decision_function(X) File "/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 338, in _decision_function X = self._validate_data(X, accept_sparse=["csr", "csc", "coo"], reset=False) File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 518, in _validate_data self._check_feature_names(X, reset=reset) File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 451, in _check_feature_names raise ValueError(message) ValueError: The feature names should match those that were passed during fit. Feature names unseen at fit time: - 0 - 1 Feature names seen at fit time, yet now missing: - x1 - x2 ``` https://github.com/apache/spark/actions/runs/3871715040/jobs/6600578830 ### Why are the changes needed? To recover the broken test. ### Does this PR introduce _any_ user-facing change? No, test-only (for now). Maybe a regression but from MLflow. ### How was this patch tested? CI in this PR should test it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org