This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 55774c6cf20 [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar 
input fails with batch size of one
55774c6cf20 is described below

commit 55774c6cf209e1b4b32af4d33b44a4f54fbcb6be
Author: Lee Yang <leewy...@gmail.com>
AuthorDate: Fri Apr 28 08:37:37 2023 +0800

    [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with 
batch size of one
    
    ### What changes were proposed in this pull request?
    
    This is a followup to #39817 to handle another error condition when the 
input batch is a single scalar value (where the previous fix focused on a 
single scalar value output).
    
    ### Why are the changes needed?
    Using `predict_batch_udf` fails when the input batch size is one.
    ```
    import numpy as np
    from pyspark.ml.functions import predict_batch_udf
    from pyspark.sql.types import DoubleType
    
    df = spark.createDataFrame([[1.0],[2.0]], schema=["a"])
    
    def make_predict_fn():
        def predict(inputs):
            return inputs
    
        return predict
    
    identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), 
batch_size=1)
    preds = df.withColumn("preds", identity("a")).show()
    ```
    fails with:
    ```
      File "/.../spark/python/pyspark/worker.py", line 869, in main
        process()
      File "/.../spark/python/pyspark/worker.py", line 861, in process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in 
dump_stream
        return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
      File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in 
dump_stream
        for batch in iterator:
      File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in 
init_stream_yield_batches
        for series in iterator:
      File "/.../spark/python/pyspark/worker.py", line 555, in func
        for result_batch, result_type in result_iter:
      File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict
        yield _validate_and_transform_prediction_result(
      File "/.../spark/python/pyspark/ml/functions.py", line 339, in 
_validate_and_transform_prediction_result
        if len(preds_array) != num_input_rows:
    TypeError: len() of unsized object
    ```
    
    After the fix:
    ```
    +---+-----+
    |  a|preds|
    +---+-----+
    |1.0|  1.0|
    |2.0|  2.0|
    +---+-----+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    This fixes a bug in the feature that was released in Spark 3.4.0.
    
    ### How was this patch tested?
    Unit test was added.
    
    Closes #40967 from leewyang/SPARK-43298.
    
    Authored-by: Lee Yang <leewy...@gmail.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/ml/functions.py            | 5 +++--
 python/pyspark/ml/tests/test_functions.py | 5 +++++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index 4ad239cb5f0..bce4101df1e 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -236,7 +236,8 @@ def _validate_and_transform_single_input(
         # scalar columns
         if len(batch.columns) == 1:
             # single scalar column, remove extra dim
-            single_input = np.squeeze(batch.to_numpy())
+            np_batch = batch.to_numpy()
+            single_input = np.squeeze(np_batch, -1) if len(np_batch.shape) > 1 
else np_batch
             if input_shapes and input_shapes[0] not in [None, [], [1]]:
                 raise ValueError("Invalid input_tensor_shape for scalar 
column.")
         elif not has_tuple:
@@ -344,7 +345,7 @@ def _validate_and_transform_prediction_result(
         ):
             raise ValueError("Invalid shape for scalar prediction result.")
 
-        output = np.squeeze(preds)  # type: ignore[arg-type]
+        output = np.squeeze(preds_array, -1) if len(preds_array.shape) > 1 
else preds_array
         return pd.Series(output).astype(output.dtype)
     else:
         raise ValueError("Unsupported return type")
diff --git a/python/pyspark/ml/tests/test_functions.py 
b/python/pyspark/ml/tests/test_functions.py
index 6c2268b0968..894db2f8a7d 100644
--- a/python/pyspark/ml/tests/test_functions.py
+++ b/python/pyspark/ml/tests/test_functions.py
@@ -72,6 +72,11 @@ class PredictBatchUDFTests(SparkSessionTestCase):
         with self.assertRaisesRegex(Exception, "Multiple input columns found, 
but model expected"):
             preds = self.df.withColumn("preds", identity("a", "b")).toPandas()
 
+        # batch_size 1
+        identity = predict_batch_udf(make_predict_fn, 
return_type=DoubleType(), batch_size=1)
+        preds = self.df.withColumn("preds", identity("a")).toPandas()
+        self.assertTrue(preds["a"].equals(preds["preds"]))
+
     def test_identity_multi(self):
         # single input model
         def make_predict_fn():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to