This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new b4e28df675fa [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for 
spark.sql.execution.arrow.maxRecordsPerBatch
b4e28df675fa is described below

commit b4e28df675fa3c55487df6d3fb6f8a068f38748b
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Feb 16 12:41:19 2024 +0900

    [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for 
spark.sql.execution.arrow.maxRecordsPerBatch
    
    This PR fixes the regression introduced by 
https://github.com/apache/spark/pull/36683.
    
    ```python
    import pandas as pd
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0)
    spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
    spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
    
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1)
    spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas()
    ```
    
    **Before**
    
    ```
    /.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: 
createDataFrame attempted Arrow optimization because 
'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the 
error below and will not continue because automatic fallback with 
'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false.
      range() arg 3 must not be zero
      warn(msg)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 1483, in 
createDataFrame
        return super(SparkSession, self).createDataFrame(  # type: 
ignore[call-overload]
      File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in 
createDataFrame
        return self._create_from_pandas_with_arrow(data, schema, timezone)
      File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in 
_create_from_pandas_with_arrow
        pdf_slices = (pdf.iloc[start : start + step] for start in range(0, 
len(pdf), step))
    ValueError: range() arg 3 must not be zero
    ```
    ```
    Empty DataFrame
    Columns: [a]
    Index: []
    ```
    
    **After**
    
    ```
         a
    0  123
    ```
    
    ```
         a
    0  123
    ```
    
    It fixes a regerssion. This is a documented behaviour. It should be 
backported to branch-3.4 and branch-3.5.
    
    Yes, it fixes a regression as described above.
    
    Unittest was added.
    
    No.
    
    Closes #45132 from HyukjinKwon/SPARK-47068.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 3bb762dc032866cfb304019cba6db01125556c2f)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/conversion.py |  1 +
 python/pyspark/sql/tests/test_arrow.py  | 10 ++++++++++
 2 files changed, 11 insertions(+)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index a5f0664ed75d..e4c4af709fa8 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -600,6 +600,7 @@ class SparkConversionMixin:
 
         # Slice the DataFrame to be batched
         step = self._jconf.arrowMaxRecordsPerBatch()
+        step = step if step > 0 else len(pdf)
         pdf_slices = (pdf.iloc[start : start + step] for start in range(0, 
len(pdf), step))
 
         # Create list of Arrow (columns, type) for serializer dump_stream
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 7fded1cbefdc..de832ce9273e 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -831,6 +831,16 @@ class ArrowTestsMixin:
         self.assertEqual([Row(c1=1, c2="string")], df.collect())
         self.assertGreater(self.spark.sparkContext.defaultParallelism, 
len(pdf))
 
+    def test_negative_and_zero_batch_size(self):
+        # SPARK-47068: Negative and zero value should work as unlimited batch 
size.
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
0}):
+            pdf = pd.DataFrame({"a": [123]})
+            assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())
+
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
-1}):
+            pdf = pd.DataFrame({"a": [123]})
+            assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas())
+
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to