This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new dc5fffc4b58 [SPARK-44486][PYTHON][CONNECT] Implement PyArrow 
`self_destruct` feature for `toPandas`
dc5fffc4b58 is described below

commit dc5fffc4b5833dbc0414d66994348ed56d155bbd
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Tue Jul 25 09:43:52 2023 +0900

    [SPARK-44486][PYTHON][CONNECT] Implement PyArrow `self_destruct` feature 
for `toPandas`
    
    ### What changes were proposed in this pull request?
    Implement Arrow `self_destruct` of `toPandas` for memory savings.
    
    Now the Spark configuration 
`spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable 
PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when 
creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory 
while building the Pandas DataFrame.
    
    ### Why are the changes needed?
    Reach parity with vanilla PySpark. The PR is a mirror of 
https://github.com/apache/spark/pull/29818 for Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Unit test.
    
    Closes #42079 from xinrong-meng/self_destruct.
    
    Authored-by: Xinrong Meng <xinr...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 78b3345fa17cf54ecaaab7da27125a08233c9e94)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/client/core.py          | 47 ++++++++++++++++++++--
 .../pyspark/sql/tests/connect/test_parity_arrow.py | 15 +++++--
 2 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 00f2a85d602..56236892122 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -757,14 +757,33 @@ class SparkConnectClient(object):
         logger.info(f"Executing plan {self._proto_to_string(plan)}")
         req = self._execute_plan_request_with_metadata()
         req.plan.CopyFrom(plan)
-        table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req)
+        (self_destruct_conf,) = self.get_config_with_defaults(
+            ("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", 
"false"),
+        )
+        self_destruct = cast(str, self_destruct_conf).lower() == "true"
+        table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
+            req, self_destruct=self_destruct
+        )
         assert table is not None
 
         schema = schema or from_arrow_schema(table.schema, 
prefer_timestamp_ntz=True)
         assert schema is not None and isinstance(schema, StructType)
 
         # Rename columns to avoid duplicated column names.
-        pdf = table.rename_columns([f"col_{i}" for i in 
range(table.num_columns)]).to_pandas()
+        renamed_table = table.rename_columns([f"col_{i}" for i in 
range(table.num_columns)])
+        if self_destruct:
+            # Configure PyArrow to use as little memory as possible:
+            # self_destruct - free columns as they are converted
+            # split_blocks - create a separate Pandas block for each column
+            # use_threads - convert one column at a time
+            pandas_options = {
+                "self_destruct": True,
+                "split_blocks": True,
+                "use_threads": False,
+            }
+            pdf = renamed_table.to_pandas(**pandas_options)
+        else:
+            pdf = renamed_table.to_pandas()
         pdf.columns = schema.names
 
         if len(pdf.columns) > 0:
@@ -1108,7 +1127,7 @@ class SparkConnectClient(object):
             self._handle_error(error)
 
     def _execute_and_fetch(
-        self, req: pb2.ExecutePlanRequest
+        self, req: pb2.ExecutePlanRequest, self_destruct: bool = False
     ) -> Tuple[
         Optional["pa.Table"],
         Optional[StructType],
@@ -1144,7 +1163,27 @@ class SparkConnectClient(object):
                 )
 
         if len(batches) > 0:
-            table = pa.Table.from_batches(batches=batches)
+            if self_destruct:
+                results = []
+                for batch in batches:
+                    # self_destruct frees memory column-wise, but Arrow record 
batches are
+                    # oriented row-wise, so copies each column into its own 
allocation
+                    batch = pa.RecordBatch.from_arrays(
+                        [
+                            # This call actually reallocates the array
+                            pa.concat_arrays([array])
+                            for array in batch
+                        ],
+                        schema=batch.schema,
+                    )
+                    results.append(batch)
+                table = pa.Table.from_batches(batches=results)
+                # Ensure only the table has a reference to the batches, so that
+                # self_destruct (if enabled) is effective
+                del results
+                del batches
+            else:
+                table = pa.Table.from_batches(batches=batches)
             return table, schema, metrics, observed_metrics, properties
         else:
             return None, schema, metrics, observed_metrics, properties
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index e491305e867..5f76cafb192 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -19,11 +19,13 @@ import unittest
 from distutils.version import LooseVersion
 
 import pandas as pd
+
 from pyspark.sql.tests.test_arrow import ArrowTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
-class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase):
+class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase, 
PandasOnSparkTestUtils):
     @unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
     def test_createDataFrame_empty_partition(self):
         super().test_createDataFrame_empty_partition()
@@ -56,9 +58,16 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
     def test_no_partition_toPandas(self):
         super().test_no_partition_toPandas()
 
-    @unittest.skip("The test uses internal APIs.")
     def test_pandas_self_destruct(self):
-        super().test_pandas_self_destruct()
+        df = self.spark.range(100).select("id", "id", "id")
+
+        with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            self_destruct_pdf = df.toPandas()
+
+        with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": 
False}):
+            no_self_destruct_pdf = df.toPandas()
+
+        self.assert_eq(self_destruct_pdf, no_self_destruct_pdf)
 
     def test_propagates_spark_exception(self):
         self.check_propagates_spark_exception()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to