This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b7127004358a [SPARK-54317][PYTHON][CONNECT] Unify Arrow conversion 
logic for Classic and Connect toPandas
b7127004358a is described below

commit b7127004358abd70080fb2cffd28f2478dea53a9
Author: Yicong-Huang <[email protected]>
AuthorDate: Tue Nov 18 12:53:53 2025 +0800

    [SPARK-54317][PYTHON][CONNECT] Unify Arrow conversion logic for Classic and 
Connect toPandas
    
    ## What changes were proposed in this pull request?
    
    This PR merges the Arrow conversion code paths between Spark Connect and 
Classic Spark by extracting shared logic into a reusable helper function 
`_convert_arrow_table_to_pandas`.
    
    ## Why are the changes needed?
    
    This unifies optimizations from two separate PRs:
    - **[SPARK-53967]** (Classic): Avoid intermediate pandas DataFrame creation 
by converting Arrow columns directly to Series
    - **[SPARK-54183]** (Connect): Same optimization implemented for Spark 
Connect
    
    ## Does this PR introduce any user-facing change?
    
    No. This is a pure refactoring with no API or behavior changes.
    
    ## How was this patch tested?
    Ran existing Arrow test suite: 
`python/pyspark/sql/tests/arrow/test_arrow.py`
    
    ## Was this patch authored or co-authored using generative AI tooling?
    
    Co-Generated-by Cursor with Claude 4.5 Sonnet
    
    Closes #53045 from Yicong-Huang/SPARK-54317/feat/merge-arrow-code-path.
    
    Lead-authored-by: Yicong-Huang 
<[email protected]>
    Co-authored-by: Yicong Huang 
<[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/client/core.py |  84 +++-----------
 python/pyspark/sql/pandas/conversion.py   | 181 +++++++++++++++++++++---------
 2 files changed, 143 insertions(+), 122 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 48e07642e157..0c9a6b45bd2f 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -99,8 +99,9 @@ from pyspark.sql.connect.plan import (
 )
 from pyspark.sql.connect.observation import Observation
 from pyspark.sql.connect.utils import get_python_ver
-from pyspark.sql.pandas.types import _create_converter_to_pandas, 
from_arrow_schema
-from pyspark.sql.types import DataType, StructType, _has_type
+from pyspark.sql.pandas.types import from_arrow_schema
+from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas
+from pyspark.sql.types import DataType, StructType
 from pyspark.util import PythonEvalType
 from pyspark.storagelevel import StorageLevel
 from pyspark.errors import PySparkValueError, PySparkAssertionError, 
PySparkNotImplementedError
@@ -987,8 +988,8 @@ class SparkConnectClient(object):
         # Get all related configs in a batch
         (
             timezone,
-            struct_in_pandas,
-            self_destruct,
+            structHandlingMode,
+            selfDestruct,
         ) = self.get_configs(
             "spark.sql.session.timeZone",
             "spark.sql.execution.pandas.structHandlingMode",
@@ -996,7 +997,7 @@ class SparkConnectClient(object):
         )
 
         table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
-            req, observations, self_destruct == "true"
+            req, observations, selfDestruct == "true"
         )
         assert table is not None
         ei = ExecutionInfo(metrics, observed_metrics)
@@ -1004,71 +1005,14 @@ class SparkConnectClient(object):
         schema = schema or from_arrow_schema(table.schema, 
prefer_timestamp_ntz=True)
         assert schema is not None and isinstance(schema, StructType)
 
-        # Rename columns to avoid duplicated column names during processing
-        temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
-        table = table.rename_columns(temp_col_names)
-
-        # Pandas DataFrame created from PyArrow uses datetime64[ns] for date 
type
-        # values, but we should use datetime.date to match the behavior with 
when
-        # Arrow optimization is disabled.
-        pandas_options = {"coerce_temporal_nanoseconds": True}
-        if self_destruct == "true" and table.num_rows > 0:
-            # Configure PyArrow to use as little memory as possible:
-            # self_destruct - free columns as they are converted
-            # split_blocks - create a separate Pandas block for each column
-            # use_threads - convert one column at a time
-            pandas_options.update(
-                {
-                    "self_destruct": True,
-                    "split_blocks": True,
-                    "use_threads": False,
-                }
-            )
-
-        if len(schema.names) > 0:
-            error_on_duplicated_field_names: bool = False
-            if struct_in_pandas == "legacy" and any(
-                _has_type(f.dataType, StructType) for f in schema.fields
-            ):
-                error_on_duplicated_field_names = True
-                struct_in_pandas = "dict"
-
-            # SPARK-51112: If the table is empty, we avoid using pyarrow 
to_pandas to create the
-            # DataFrame, as it may fail with a segmentation fault.
-            if table.num_rows == 0:
-                # For empty tables, create empty Series with converters to 
preserve dtypes
-                pdf = pd.concat(
-                    [
-                        _create_converter_to_pandas(
-                            field.dataType,
-                            field.nullable,
-                            timezone=timezone,
-                            struct_in_pandas=struct_in_pandas,
-                            
error_on_duplicated_field_names=error_on_duplicated_field_names,
-                        )(pd.Series([], name=temp_col_names[i], 
dtype="object"))
-                        for i, field in enumerate(schema.fields)
-                    ],
-                    axis="columns",
-                )
-            else:
-                pdf = pd.concat(
-                    [
-                        _create_converter_to_pandas(
-                            field.dataType,
-                            field.nullable,
-                            timezone=timezone,
-                            struct_in_pandas=struct_in_pandas,
-                            
error_on_duplicated_field_names=error_on_duplicated_field_names,
-                        )(arrow_col.to_pandas(**pandas_options))
-                        for arrow_col, field in zip(table.columns, 
schema.fields)
-                    ],
-                    axis="columns",
-                )
-            # Restore original column names (including duplicates)
-            pdf.columns = schema.names
-        else:
-            # empty columns
-            pdf = table.to_pandas(**pandas_options)
+        pdf = _convert_arrow_table_to_pandas(
+            arrow_table=table,
+            schema=schema,
+            timezone=timezone,
+            struct_handling_mode=structHandlingMode,
+            date_as_object=False,
+            self_destruct=selfDestruct == "true",
+        )
 
         if len(metrics) > 0:
             pdf.attrs["metrics"] = metrics
diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 351a23299d9c..43f5d56d9bcb 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -21,7 +21,9 @@ from typing import (
     Iterator,
     List,
     Optional,
+    Sequence,
     Union,
+    cast,
     no_type_check,
     overload,
     TYPE_CHECKING,
@@ -37,6 +39,7 @@ from pyspark.sql.types import (
     MapType,
     TimestampType,
     StructType,
+    _has_type,
     DataType,
     _create_row,
     StringType,
@@ -53,6 +56,119 @@ if TYPE_CHECKING:
     from pyspark.sql import DataFrame
 
 
+def _convert_arrow_table_to_pandas(
+    arrow_table: "pa.Table",
+    schema: "StructType",
+    *,
+    timezone: Optional[str] = None,
+    struct_handling_mode: Optional[str] = None,
+    date_as_object: bool = False,
+    self_destruct: bool = False,
+) -> "PandasDataFrameLike":
+    """
+    Helper function to convert Arrow table columns to a pandas DataFrame.
+
+    This function applies Spark-specific type converters to Arrow columns and 
concatenates
+    them into a pandas DataFrame.
+
+    Parameters
+    ----------
+    arrow_table : pyarrow.Table
+        The Arrow table to convert
+    schema : StructType
+        The schema of the DataFrame
+    timezone : str or None
+        The timezone to use for timestamp conversions (can be None if not 
configured)
+    struct_handling_mode : str or None
+        How to handle struct types in pandas ("dict", "row", or "legacy", can 
be None
+         if not configured). If "legacy", it will be converted to "dict" and 
error checking
+         for duplicated field names will be enabled when StructType fields are 
present.
+    date_as_object : bool
+        Whether to convert date values to Python datetime.date objects 
(default: False)
+    self_destruct : bool
+        Whether to enable memory-efficient self-destruct mode for large tables 
(default: False)
+
+    Returns
+    -------
+    pandas.DataFrame
+        The converted pandas DataFrame
+    """
+    import pandas as pd
+    from pyspark.sql.pandas.types import _create_converter_to_pandas
+
+    # Build pandas options
+    # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
+    # values, but we should use datetime.date to match the behavior with when
+    # Arrow optimization is disabled.
+    pandas_options = {"coerce_temporal_nanoseconds": True}
+    if date_as_object:
+        pandas_options["date_as_object"] = True
+
+    # Handle empty columns case
+    if len(schema.fields) == 0:
+        return arrow_table.to_pandas(**pandas_options)
+
+    # Rename columns to avoid duplicated column names during processing
+    temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
+    arrow_table = arrow_table.rename_columns(temp_col_names)
+
+    # Configure self-destruct mode for memory efficiency
+    if self_destruct and arrow_table.num_rows > 0:
+        # Configure PyArrow to use as little memory as possible:
+        # self_destruct - free columns as they are converted
+        # split_blocks - create a separate Pandas block for each column
+        # use_threads - convert one column at a time
+        pandas_options.update(
+            {
+                "self_destruct": True,
+                "split_blocks": True,
+                "use_threads": False,
+            }
+        )
+
+    # Handle legacy struct handling mode
+    error_on_duplicated_field_names = False
+    if struct_handling_mode == "legacy" and any(
+        _has_type(f.dataType, StructType) for f in schema.fields
+    ):
+        error_on_duplicated_field_names = True
+        struct_handling_mode = "dict"
+
+    # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to 
create the
+    # DataFrame, as it may fail with a segmentation fault.
+    if arrow_table.num_rows == 0:
+        # For empty tables, create empty Series to preserve dtypes
+        column_data = (
+            pd.Series([], name=temp_col_names[i], dtype="object") for i in 
range(len(schema.fields))
+        )
+    else:
+        # For non-empty tables, convert arrow columns directly
+        column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in 
arrow_table.columns)
+
+    # Apply Spark-specific type converters to each column
+    pdf = pd.concat(
+        objs=cast(
+            Sequence[pd.Series],
+            (
+                _create_converter_to_pandas(
+                    field.dataType,
+                    field.nullable,
+                    timezone=timezone,
+                    struct_in_pandas=struct_handling_mode,
+                    
error_on_duplicated_field_names=error_on_duplicated_field_names,
+                )(series)
+                for series, field in zip(column_data, schema.fields)
+            ),
+        ),
+        axis="columns",
+    )
+
+    # Restore original column names (including duplicates)
+    pdf.columns = schema.names
+
+    return pdf
+
+
 class PandasConversionMixin:
     """
     Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only
@@ -128,68 +244,29 @@ class PandasConversionMixin:
                 try:
                     import pyarrow as pa
 
-                    self_destruct = arrowPySparkSelfDestructEnabled == "true"
-                    batches = 
self._collect_as_arrow(split_batches=self_destruct)
+                    batches = self._collect_as_arrow(
+                        split_batches=arrowPySparkSelfDestructEnabled == "true"
+                    )
 
-                    # Rename columns to avoid duplicated column names.
-                    temp_col_names = [f"col_{i}" for i in 
range(len(self.columns))]
                     if len(batches) > 0:
-                        table = 
pa.Table.from_batches(batches).rename_columns(temp_col_names)
+                        table = pa.Table.from_batches(batches)
                     else:
                         # empty dataset
-                        table = 
arrow_schema.empty_table().rename_columns(temp_col_names)
+                        table = arrow_schema.empty_table()
 
                     # Ensure only the table has a reference to the batches, so 
that
                     # self_destruct (if enabled) is effective
                     del batches
 
-                    # Pandas DataFrame created from PyArrow uses 
datetime64[ns] for date type
-                    # values, but we should use datetime.date to match the 
behavior with when
-                    # Arrow optimization is disabled.
-                    pandas_options = {
-                        "date_as_object": True,
-                        "coerce_temporal_nanoseconds": True,
-                    }
-                    if self_destruct:
-                        # Configure PyArrow to use as little memory as 
possible:
-                        # self_destruct - free columns as they are converted
-                        # split_blocks - create a separate Pandas block for 
each column
-                        # use_threads - convert one column at a time
-                        pandas_options.update(
-                            {
-                                "self_destruct": True,
-                                "split_blocks": True,
-                                "use_threads": False,
-                            }
-                        )
-
-                    if len(self.columns) > 0:
-                        timezone = sessionLocalTimeZone
-                        struct_in_pandas = pandasStructHandlingMode
-
-                        error_on_duplicated_field_names = False
-                        if struct_in_pandas == "legacy":
-                            error_on_duplicated_field_names = True
-                            struct_in_pandas = "dict"
-
-                        pdf = pd.concat(
-                            [
-                                _create_converter_to_pandas(
-                                    field.dataType,
-                                    field.nullable,
-                                    timezone=timezone,
-                                    struct_in_pandas=struct_in_pandas,
-                                    
error_on_duplicated_field_names=error_on_duplicated_field_names,
-                                )(arrow_col.to_pandas(**pandas_options))
-                                for arrow_col, field in zip(table.columns, 
self.schema.fields)
-                            ],
-                            axis="columns",
-                        )
-                    else:
-                        # empty columns
-                        pdf = table.to_pandas(**pandas_options)
+                    pdf = _convert_arrow_table_to_pandas(
+                        arrow_table=table,
+                        schema=self.schema,
+                        timezone=sessionLocalTimeZone,
+                        struct_handling_mode=pandasStructHandlingMode,
+                        date_as_object=True,
+                        self_destruct=arrowPySparkSelfDestructEnabled == 
"true",
+                    )
 
-                    pdf.columns = self.columns
                     return pdf
 
                 except Exception as e:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to