This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3bb762dc0328 [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch 3bb762dc0328 is described below commit 3bb762dc032866cfb304019cba6db01125556c2f Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Feb 16 12:41:19 2024 +0900 [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch ### What changes were proposed in this pull request? This PR fixes the regression introduced by https://github.com/apache/spark/pull/36683. ```python import pandas as pd spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0) spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False) spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1) spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() ``` **Before** ``` /.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false. range() arg 3 must not be zero warn(msg) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame return self._create_from_pandas_with_arrow(data, schema, timezone) File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) ValueError: range() arg 3 must not be zero ``` ``` Empty DataFrame Columns: [a] Index: [] ``` **After** ``` a 0 123 ``` ``` a 0 123 ``` ### Why are the changes needed? It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5. ### Does this PR introduce _any_ user-facing change? Yes, it fixes a regression as described above. ### How was this patch tested? Unittest was added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45132 from HyukjinKwon/SPARK-47068. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/conversion.py | 1 + python/pyspark/sql/tests/test_arrow.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 5288f0e100bb..d958b95795b7 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -630,6 +630,7 @@ class SparkConversionMixin: # Slice the DataFrame to be batched step = self._jconf.arrowMaxRecordsPerBatch() + step = step if step > 0 else len(pdf) pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) # Create list of Arrow (columns, arrow_type, spark_type) for serializer dump_stream diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index fc979c9e8b78..c771e5db65e5 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -1144,6 +1144,16 @@ class ArrowTestsMixin: df = self.spark.createDataFrame([MyInheritedTuple(1, 2, MyInheritedTuple(1, 2, 3))]) self.assertEqual(df.first(), Row(a=1, b=2, c=Row(a=1, b=2, c=3))) + def test_negative_and_zero_batch_size(self): + # SPARK-47068: Negative and zero value should work as unlimited batch size. + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 0}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": -1}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + @unittest.skipIf( not have_pandas or not have_pyarrow, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org