This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new b4e28df675fa [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch b4e28df675fa is described below commit b4e28df675fa3c55487df6d3fb6f8a068f38748b Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Feb 16 12:41:19 2024 +0900 [SPARK-47068][PYTHON][TESTS] Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch This PR fixes the regression introduced by https://github.com/apache/spark/pull/36683. ```python import pandas as pd spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0) spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False) spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1) spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() ``` **Before** ``` /.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and will not continue because automatic fallback with 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false. range() arg 3 must not be zero warn(msg) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/session.py", line 1483, in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in createDataFrame return self._create_from_pandas_with_arrow(data, schema, timezone) File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in _create_from_pandas_with_arrow pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) ValueError: range() arg 3 must not be zero ``` ``` Empty DataFrame Columns: [a] Index: [] ``` **After** ``` a 0 123 ``` ``` a 0 123 ``` It fixes a regerssion. This is a documented behaviour. It should be backported to branch-3.4 and branch-3.5. Yes, it fixes a regression as described above. Unittest was added. No. Closes #45132 from HyukjinKwon/SPARK-47068. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 3bb762dc032866cfb304019cba6db01125556c2f) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/conversion.py | 1 + python/pyspark/sql/tests/test_arrow.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a5f0664ed75d..e4c4af709fa8 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -600,6 +600,7 @@ class SparkConversionMixin: # Slice the DataFrame to be batched step = self._jconf.arrowMaxRecordsPerBatch() + step = step if step > 0 else len(pdf) pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) # Create list of Arrow (columns, type) for serializer dump_stream diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 7fded1cbefdc..de832ce9273e 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -831,6 +831,16 @@ class ArrowTestsMixin: self.assertEqual([Row(c1=1, c2="string")], df.collect()) self.assertGreater(self.spark.sparkContext.defaultParallelism, len(pdf)) + def test_negative_and_zero_batch_size(self): + # SPARK-47068: Negative and zero value should work as unlimited batch size. + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 0}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": -1}): + pdf = pd.DataFrame({"a": [123]}) + assert_frame_equal(pdf, self.spark.createDataFrame(pdf).toPandas()) + @unittest.skipIf( not have_pandas or not have_pyarrow, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org