This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 55774c6cf20 [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with batch size of one 55774c6cf20 is described below commit 55774c6cf209e1b4b32af4d33b44a4f54fbcb6be Author: Lee Yang <leewy...@gmail.com> AuthorDate: Fri Apr 28 08:37:37 2023 +0800 [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with batch size of one ### What changes were proposed in this pull request? This is a followup to #39817 to handle another error condition when the input batch is a single scalar value (where the previous fix focused on a single scalar value output). ### Why are the changes needed? Using `predict_batch_udf` fails when the input batch size is one. ``` import numpy as np from pyspark.ml.functions import predict_batch_udf from pyspark.sql.types import DoubleType df = spark.createDataFrame([[1.0],[2.0]], schema=["a"]) def make_predict_fn(): def predict(inputs): return inputs return predict identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), batch_size=1) preds = df.withColumn("preds", identity("a")).show() ``` fails with: ``` File "/.../spark/python/pyspark/worker.py", line 869, in main process() File "/.../spark/python/pyspark/worker.py", line 861, in process serializer.dump_stream(out_iter, outfile) File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in dump_stream for batch in iterator: File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in init_stream_yield_batches for series in iterator: File "/.../spark/python/pyspark/worker.py", line 555, in func for result_batch, result_type in result_iter: File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict yield _validate_and_transform_prediction_result( File "/.../spark/python/pyspark/ml/functions.py", line 339, in _validate_and_transform_prediction_result if len(preds_array) != num_input_rows: TypeError: len() of unsized object ``` After the fix: ``` +---+-----+ | a|preds| +---+-----+ |1.0| 1.0| |2.0| 2.0| +---+-----+ ``` ### Does this PR introduce _any_ user-facing change? This fixes a bug in the feature that was released in Spark 3.4.0. ### How was this patch tested? Unit test was added. Closes #40967 from leewyang/SPARK-43298. Authored-by: Lee Yang <leewy...@gmail.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/ml/functions.py | 5 +++-- python/pyspark/ml/tests/test_functions.py | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py index 4ad239cb5f0..bce4101df1e 100644 --- a/python/pyspark/ml/functions.py +++ b/python/pyspark/ml/functions.py @@ -236,7 +236,8 @@ def _validate_and_transform_single_input( # scalar columns if len(batch.columns) == 1: # single scalar column, remove extra dim - single_input = np.squeeze(batch.to_numpy()) + np_batch = batch.to_numpy() + single_input = np.squeeze(np_batch, -1) if len(np_batch.shape) > 1 else np_batch if input_shapes and input_shapes[0] not in [None, [], [1]]: raise ValueError("Invalid input_tensor_shape for scalar column.") elif not has_tuple: @@ -344,7 +345,7 @@ def _validate_and_transform_prediction_result( ): raise ValueError("Invalid shape for scalar prediction result.") - output = np.squeeze(preds) # type: ignore[arg-type] + output = np.squeeze(preds_array, -1) if len(preds_array.shape) > 1 else preds_array return pd.Series(output).astype(output.dtype) else: raise ValueError("Unsupported return type") diff --git a/python/pyspark/ml/tests/test_functions.py b/python/pyspark/ml/tests/test_functions.py index 6c2268b0968..894db2f8a7d 100644 --- a/python/pyspark/ml/tests/test_functions.py +++ b/python/pyspark/ml/tests/test_functions.py @@ -72,6 +72,11 @@ class PredictBatchUDFTests(SparkSessionTestCase): with self.assertRaisesRegex(Exception, "Multiple input columns found, but model expected"): preds = self.df.withColumn("preds", identity("a", "b")).toPandas() + # batch_size 1 + identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), batch_size=1) + preds = self.df.withColumn("preds", identity("a")).toPandas() + self.assertTrue(preds["a"].equals(preds["preds"])) + def test_identity_multi(self): # single input model def make_predict_fn(): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org