This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb762dc0328 [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for 
spark.sql.execution.arrow.maxRecordsPerBatch
3bb762dc0328 is described below

commit 3bb762dc032866cfb304019cba6db01125556c2f
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Feb 16 12:41:19 2024 +0900

    [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for 
spark.sql.execution.arrow.maxRecordsPerBatch
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the regression introduced by 
https://github.com/apache/spark/pull/36683.
    
    ```python
    import pandas as pd
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
    spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
    spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
    
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
    spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
    ```
    
    **Before**
    
    ```
    /.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: 
createDataFrame attempted Arrow optimization because 
'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the 
error below and will not continue because automatic fallback with 
'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
      range() arg 3 must not be zero
      warn(msg)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 1483, in 
createDataFrame
        return super(SparkSession, self).createDataFrame(  # type: 
ignore[call-overload]
      File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in 
createDataFrame
        return self._create_from_pandas_with_arrow(data, schema, timezone)
      File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in 
_create_from_pandas_with_arrow
        pdf_slices = (pdf.iloc[start : start + step] for start in range(0, 
len(pdf), step))
    ValueError: range() arg 3 must not be zero
    ```
    ```
    Empty DataFrame
    Columns: [a]
    Index: []
    ```
    
    **After**
    
    ```
         a
    0  123
    ```
    
    ```
         a
    0  123
    ```
    
    ### Why are the changes needed?
    
    It fixes a regerssion. This is a documented behaviour. It should be 
backported to branch-3.4 and branch-3.5.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes a regression as described above.
    
    ### How was this patch tested?
    
    Unittest was added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45132 from HyukjinKwon/SPARK-47068.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/conversion.py |  1 +
 python/pyspark/sql/tests/test_arrow.py  | 10 ++++++++++
 2 files changed, 11 insertions(+)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 5288f0e100bb..d958b95795b7 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -630,6 +630,7 @@ class SparkConversionMixin:
 
         # Slice the DataFrame to be batched
         step = self._jconf.arrowMaxRecordsPerBatch()
+        step = step if step > 0 else len(pdf)
         pdf_slices = (pdf.iloc[start : start + step] for start in range(0, 
len(pdf), step))
 
         # Create list of Arrow (columns, arrow_type, spark_type) for 
serializer dump_stream
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index fc979c9e8b78..c771e5db65e5 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -1144,6 +1144,16 @@ class ArrowTestsMixin:
             df = self.spark.createDataFrame([MyInheritedTuple(1, 2, 
MyInheritedTuple(1, 2, 3))])
             self.assertEqual(df.first(), Row(a=1, b=2, c=Row(a=1, b=2, c=3)))
 
+    def test_negative_and_zero_batch_size(self):
+        # SPARK-47068: Negative and zero value should work as unlimited batch 
size.
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
0}):
+            pdf = pd.DataFrame({"a": [123]})
+            assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())
+
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
-1}):
+            pdf = pd.DataFrame({"a": [123]})
+            assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())
+
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to