HyukjinKwon opened a new pull request, #39467:
URL: https://github.com/apache/spark/pull/39467

   ### What changes were proposed in this pull request?
   
   After MLflow 2.1 release (https://github.com/mlflow/mlflow/releases), the 
unittest in pandas API on Spark is broken.
   
   ```python
   from mlflow.tracking import MlflowClient, set_tracking_uri
   from sklearn.linear_model import LinearRegression
   import mlflow.sklearn
   from tempfile import mkdtemp
   d = mkdtemp("pandas_on_spark_mlflow")
   set_tracking_uri("file:%s"%d)
   client = MlflowClient()
   exp_id = mlflow.create_experiment("my_experiment")
   exp = mlflow.set_experiment("my_experiment")
   train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2,
                         "y": np.log(2 + np.arange(8))})
   train_x = train[["x1", "x2"]]
   train_y = train[["y"]]
   with mlflow.start_run():
       lr = LinearRegression()
       lr.fit(train_x, train_y)
       mlflow.sklearn.log_model(lr, "model")
   from pyspark.pandas.mlflow import load_model
   run_info = client.search_runs(exp_id)[-1].info
   model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_id))
   prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]})
   prediction_df["prediction"] = model.predict(prediction_df)
   print(prediction_df)
   ```
   
   
https://github.com/apache/spark/blob/06ec98b0d6a51e0c3ffec70e78d86d577b0e7a72/python/pyspark/pandas/mlflow.py#L134-L202
   
   ```
   File "/__w/spark/spark/python/pyspark/pandas/mlflow.py", line 172, in 
pyspark.pandas.mlflow.load_model
   Failed example:
       prediction_df
   Exception raised:
       Traceback (most recent call last):
         File "/usr/lib/python3.9/doctest.py", line 1336, in __run
           exec(compile(example.source, filename, "single",
         File "<doctest pyspark.pandas.mlflow.load_model[18]>", line 1, in 
<module>
           prediction_df
         File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13322, in 
__repr__
           pdf = cast("DataFrame", 
self._get_or_create_repr_pandas_cache(max_display_count))
         File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13313, in 
_get_or_create_repr_pandas_cache
           self, "_repr_pandas_cache", {n: self.head(n + 
1)._to_internal_pandas()}
         File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13308, in 
_to_internal_pandas
           return self._internal.to_pandas_frame
         File "/__w/spark/spark/python/pyspark/pandas/utils.py", line 588, in 
wrapped_lazy_property
           setattr(self, attr_name, fn(self))
         File "/__w/spark/spark/python/pyspark/pandas/internal.py", line 1056, 
in to_pandas_frame
           pdf = sdf.toPandas()
         File "/__w/spark/spark/python/pyspark/sql/pandas/conversion.py", line 
208, in toPandas
           pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
         File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 1197, in 
collect
           sock_info = self._jdf.collectToPython()
         File 
"/__w/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
           return_value = get_return_value(
         File "/__w/spark/spark/python/pyspark/sql/utils.py", line 209, in deco
           raise converted from None
       pyspark.sql.utils.PythonException: 
         An exception was thrown from the Python worker. Please see the stack 
trace below.
       Traceback (most recent call last):
         File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
829, in main
           process()
         File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
821, in process
           serializer.dump_stream(out_iter, outfile)
         File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 345, in dump_stream
           return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
         File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 86, in dump_stream
           for batch in iterator:
         File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 338, in init_stream_yield_batches
           for series in iterator:
         File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
519, in func
           for result_batch, result_type in result_iter:
         File 
"/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1253, 
in udf
           yield _predict_row_batch(batch_predict_fn, row_batch_args)
         File 
"/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1057, 
in _predict_row_batch
           result = predict_fn(pdf)
         File 
"/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 1237, 
in batch_predict_fn
           return loaded_model.predict(pdf)
         File 
"/usr/local/lib/python3.9/dist-packages/mlflow/pyfunc/__init__.py", line 413, 
in predict
           return self._predict_fn(data)
         File 
"/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 
355, in predict
           return self._decision_function(X)
         File 
"/usr/local/lib/python3.9/dist-packages/sklearn/linear_model/_base.py", line 
338, in _decision_function
           X = self._validate_data(X, accept_sparse=["csr", "csc", "coo"], 
reset=False)
         File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 
518, in _validate_data
           self._check_feature_names(X, reset=reset)
         File "/usr/local/lib/python3.9/dist-packages/sklearn/base.py", line 
451, in _check_feature_names
           raise ValueError(message)
       ValueError: The feature names should match those that were passed during 
fit.
       Feature names unseen at fit time:
       - 0
       - 1
       Feature names seen at fit time, yet now missing:
       - x1
       - x2
   ```
   
   https://github.com/apache/spark/actions/runs/3871715040/jobs/6600578830
   
   ### Why are the changes needed?
   
   To recover the broken test.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, test-only (for now).
   Maybe a regression but from MLflow.
   
   ### How was this patch tested?
   
   CI in this PR should test it out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to