This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new dc5fffc4b58 [SPARK-44486][PYTHON][CONNECT] Implement PyArrow `self_destruct` feature for `toPandas` dc5fffc4b58 is described below commit dc5fffc4b5833dbc0414d66994348ed56d155bbd Author: Xinrong Meng <xinr...@apache.org> AuthorDate: Tue Jul 25 09:43:52 2023 +0900 [SPARK-44486][PYTHON][CONNECT] Implement PyArrow `self_destruct` feature for `toPandas` ### What changes were proposed in this pull request? Implement Arrow `self_destruct` of `toPandas` for memory savings. Now the Spark configuration `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory while building the Pandas DataFrame. ### Why are the changes needed? Reach parity with vanilla PySpark. The PR is a mirror of https://github.com/apache/spark/pull/29818 for Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #42079 from xinrong-meng/self_destruct. Authored-by: Xinrong Meng <xinr...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 78b3345fa17cf54ecaaab7da27125a08233c9e94) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/client/core.py | 47 ++++++++++++++++++++-- .../pyspark/sql/tests/connect/test_parity_arrow.py | 15 +++++-- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 00f2a85d602..56236892122 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -757,14 +757,33 @@ class SparkConnectClient(object): logger.info(f"Executing plan {self._proto_to_string(plan)}") req = self._execute_plan_request_with_metadata() req.plan.CopyFrom(plan) - table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req) + (self_destruct_conf,) = self.get_config_with_defaults( + ("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "false"), + ) + self_destruct = cast(str, self_destruct_conf).lower() == "true" + table, schema, metrics, observed_metrics, _ = self._execute_and_fetch( + req, self_destruct=self_destruct + ) assert table is not None schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True) assert schema is not None and isinstance(schema, StructType) # Rename columns to avoid duplicated column names. - pdf = table.rename_columns([f"col_{i}" for i in range(table.num_columns)]).to_pandas() + renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)]) + if self_destruct: + # Configure PyArrow to use as little memory as possible: + # self_destruct - free columns as they are converted + # split_blocks - create a separate Pandas block for each column + # use_threads - convert one column at a time + pandas_options = { + "self_destruct": True, + "split_blocks": True, + "use_threads": False, + } + pdf = renamed_table.to_pandas(**pandas_options) + else: + pdf = renamed_table.to_pandas() pdf.columns = schema.names if len(pdf.columns) > 0: @@ -1108,7 +1127,7 @@ class SparkConnectClient(object): self._handle_error(error) def _execute_and_fetch( - self, req: pb2.ExecutePlanRequest + self, req: pb2.ExecutePlanRequest, self_destruct: bool = False ) -> Tuple[ Optional["pa.Table"], Optional[StructType], @@ -1144,7 +1163,27 @@ class SparkConnectClient(object): ) if len(batches) > 0: - table = pa.Table.from_batches(batches=batches) + if self_destruct: + results = [] + for batch in batches: + # self_destruct frees memory column-wise, but Arrow record batches are + # oriented row-wise, so copies each column into its own allocation + batch = pa.RecordBatch.from_arrays( + [ + # This call actually reallocates the array + pa.concat_arrays([array]) + for array in batch + ], + schema=batch.schema, + ) + results.append(batch) + table = pa.Table.from_batches(batches=results) + # Ensure only the table has a reference to the batches, so that + # self_destruct (if enabled) is effective + del results + del batches + else: + table = pa.Table.from_batches(batches=batches) return table, schema, metrics, observed_metrics, properties else: return None, schema, metrics, observed_metrics, properties diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index e491305e867..5f76cafb192 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -19,11 +19,13 @@ import unittest from distutils.version import LooseVersion import pandas as pd + from pyspark.sql.tests.test_arrow import ArrowTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils -class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): +class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase, PandasOnSparkTestUtils): @unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") def test_createDataFrame_empty_partition(self): super().test_createDataFrame_empty_partition() @@ -56,9 +58,16 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_no_partition_toPandas(self): super().test_no_partition_toPandas() - @unittest.skip("The test uses internal APIs.") def test_pandas_self_destruct(self): - super().test_pandas_self_destruct() + df = self.spark.range(100).select("id", "id", "id") + + with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}): + self_destruct_pdf = df.toPandas() + + with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}): + no_self_destruct_pdf = df.toPandas() + + self.assert_eq(self_destruct_pdf, no_self_destruct_pdf) def test_propagates_spark_exception(self): self.check_propagates_spark_exception() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org