This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b7127004358a [SPARK-54317][PYTHON][CONNECT] Unify Arrow conversion
logic for Classic and Connect toPandas
b7127004358a is described below
commit b7127004358abd70080fb2cffd28f2478dea53a9
Author: Yicong-Huang <[email protected]>
AuthorDate: Tue Nov 18 12:53:53 2025 +0800
[SPARK-54317][PYTHON][CONNECT] Unify Arrow conversion logic for Classic and
Connect toPandas
## What changes were proposed in this pull request?
This PR merges the Arrow conversion code paths between Spark Connect and
Classic Spark by extracting shared logic into a reusable helper function
`_convert_arrow_table_to_pandas`.
## Why are the changes needed?
This unifies optimizations from two separate PRs:
- **[SPARK-53967]** (Classic): Avoid intermediate pandas DataFrame creation
by converting Arrow columns directly to Series
- **[SPARK-54183]** (Connect): Same optimization implemented for Spark
Connect
## Does this PR introduce any user-facing change?
No. This is a pure refactoring with no API or behavior changes.
## How was this patch tested?
Ran existing Arrow test suite:
`python/pyspark/sql/tests/arrow/test_arrow.py`
## Was this patch authored or co-authored using generative AI tooling?
Co-Generated-by Cursor with Claude 4.5 Sonnet
Closes #53045 from Yicong-Huang/SPARK-54317/feat/merge-arrow-code-path.
Lead-authored-by: Yicong-Huang
<[email protected]>
Co-authored-by: Yicong Huang
<[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 84 +++-----------
python/pyspark/sql/pandas/conversion.py | 181 +++++++++++++++++++++---------
2 files changed, 143 insertions(+), 122 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 48e07642e157..0c9a6b45bd2f 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -99,8 +99,9 @@ from pyspark.sql.connect.plan import (
)
from pyspark.sql.connect.observation import Observation
from pyspark.sql.connect.utils import get_python_ver
-from pyspark.sql.pandas.types import _create_converter_to_pandas,
from_arrow_schema
-from pyspark.sql.types import DataType, StructType, _has_type
+from pyspark.sql.pandas.types import from_arrow_schema
+from pyspark.sql.pandas.conversion import _convert_arrow_table_to_pandas
+from pyspark.sql.types import DataType, StructType
from pyspark.util import PythonEvalType
from pyspark.storagelevel import StorageLevel
from pyspark.errors import PySparkValueError, PySparkAssertionError,
PySparkNotImplementedError
@@ -987,8 +988,8 @@ class SparkConnectClient(object):
# Get all related configs in a batch
(
timezone,
- struct_in_pandas,
- self_destruct,
+ structHandlingMode,
+ selfDestruct,
) = self.get_configs(
"spark.sql.session.timeZone",
"spark.sql.execution.pandas.structHandlingMode",
@@ -996,7 +997,7 @@ class SparkConnectClient(object):
)
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
- req, observations, self_destruct == "true"
+ req, observations, selfDestruct == "true"
)
assert table is not None
ei = ExecutionInfo(metrics, observed_metrics)
@@ -1004,71 +1005,14 @@ class SparkConnectClient(object):
schema = schema or from_arrow_schema(table.schema,
prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)
- # Rename columns to avoid duplicated column names during processing
- temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
- table = table.rename_columns(temp_col_names)
-
- # Pandas DataFrame created from PyArrow uses datetime64[ns] for date
type
- # values, but we should use datetime.date to match the behavior with
when
- # Arrow optimization is disabled.
- pandas_options = {"coerce_temporal_nanoseconds": True}
- if self_destruct == "true" and table.num_rows > 0:
- # Configure PyArrow to use as little memory as possible:
- # self_destruct - free columns as they are converted
- # split_blocks - create a separate Pandas block for each column
- # use_threads - convert one column at a time
- pandas_options.update(
- {
- "self_destruct": True,
- "split_blocks": True,
- "use_threads": False,
- }
- )
-
- if len(schema.names) > 0:
- error_on_duplicated_field_names: bool = False
- if struct_in_pandas == "legacy" and any(
- _has_type(f.dataType, StructType) for f in schema.fields
- ):
- error_on_duplicated_field_names = True
- struct_in_pandas = "dict"
-
- # SPARK-51112: If the table is empty, we avoid using pyarrow
to_pandas to create the
- # DataFrame, as it may fail with a segmentation fault.
- if table.num_rows == 0:
- # For empty tables, create empty Series with converters to
preserve dtypes
- pdf = pd.concat(
- [
- _create_converter_to_pandas(
- field.dataType,
- field.nullable,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
-
error_on_duplicated_field_names=error_on_duplicated_field_names,
- )(pd.Series([], name=temp_col_names[i],
dtype="object"))
- for i, field in enumerate(schema.fields)
- ],
- axis="columns",
- )
- else:
- pdf = pd.concat(
- [
- _create_converter_to_pandas(
- field.dataType,
- field.nullable,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
-
error_on_duplicated_field_names=error_on_duplicated_field_names,
- )(arrow_col.to_pandas(**pandas_options))
- for arrow_col, field in zip(table.columns,
schema.fields)
- ],
- axis="columns",
- )
- # Restore original column names (including duplicates)
- pdf.columns = schema.names
- else:
- # empty columns
- pdf = table.to_pandas(**pandas_options)
+ pdf = _convert_arrow_table_to_pandas(
+ arrow_table=table,
+ schema=schema,
+ timezone=timezone,
+ struct_handling_mode=structHandlingMode,
+ date_as_object=False,
+ self_destruct=selfDestruct == "true",
+ )
if len(metrics) > 0:
pdf.attrs["metrics"] = metrics
diff --git a/python/pyspark/sql/pandas/conversion.py
b/python/pyspark/sql/pandas/conversion.py
index 351a23299d9c..43f5d56d9bcb 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -21,7 +21,9 @@ from typing import (
Iterator,
List,
Optional,
+ Sequence,
Union,
+ cast,
no_type_check,
overload,
TYPE_CHECKING,
@@ -37,6 +39,7 @@ from pyspark.sql.types import (
MapType,
TimestampType,
StructType,
+ _has_type,
DataType,
_create_row,
StringType,
@@ -53,6 +56,119 @@ if TYPE_CHECKING:
from pyspark.sql import DataFrame
+def _convert_arrow_table_to_pandas(
+ arrow_table: "pa.Table",
+ schema: "StructType",
+ *,
+ timezone: Optional[str] = None,
+ struct_handling_mode: Optional[str] = None,
+ date_as_object: bool = False,
+ self_destruct: bool = False,
+) -> "PandasDataFrameLike":
+ """
+ Helper function to convert Arrow table columns to a pandas DataFrame.
+
+ This function applies Spark-specific type converters to Arrow columns and
concatenates
+ them into a pandas DataFrame.
+
+ Parameters
+ ----------
+ arrow_table : pyarrow.Table
+ The Arrow table to convert
+ schema : StructType
+ The schema of the DataFrame
+ timezone : str or None
+ The timezone to use for timestamp conversions (can be None if not
configured)
+ struct_handling_mode : str or None
+ How to handle struct types in pandas ("dict", "row", or "legacy", can
be None
+ if not configured). If "legacy", it will be converted to "dict" and
error checking
+ for duplicated field names will be enabled when StructType fields are
present.
+ date_as_object : bool
+ Whether to convert date values to Python datetime.date objects
(default: False)
+ self_destruct : bool
+ Whether to enable memory-efficient self-destruct mode for large tables
(default: False)
+
+ Returns
+ -------
+ pandas.DataFrame
+ The converted pandas DataFrame
+ """
+ import pandas as pd
+ from pyspark.sql.pandas.types import _create_converter_to_pandas
+
+ # Build pandas options
+ # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
+ # values, but we should use datetime.date to match the behavior with when
+ # Arrow optimization is disabled.
+ pandas_options = {"coerce_temporal_nanoseconds": True}
+ if date_as_object:
+ pandas_options["date_as_object"] = True
+
+ # Handle empty columns case
+ if len(schema.fields) == 0:
+ return arrow_table.to_pandas(**pandas_options)
+
+ # Rename columns to avoid duplicated column names during processing
+ temp_col_names = [f"col_{i}" for i in range(len(schema.names))]
+ arrow_table = arrow_table.rename_columns(temp_col_names)
+
+ # Configure self-destruct mode for memory efficiency
+ if self_destruct and arrow_table.num_rows > 0:
+ # Configure PyArrow to use as little memory as possible:
+ # self_destruct - free columns as they are converted
+ # split_blocks - create a separate Pandas block for each column
+ # use_threads - convert one column at a time
+ pandas_options.update(
+ {
+ "self_destruct": True,
+ "split_blocks": True,
+ "use_threads": False,
+ }
+ )
+
+ # Handle legacy struct handling mode
+ error_on_duplicated_field_names = False
+ if struct_handling_mode == "legacy" and any(
+ _has_type(f.dataType, StructType) for f in schema.fields
+ ):
+ error_on_duplicated_field_names = True
+ struct_handling_mode = "dict"
+
+ # SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to
create the
+ # DataFrame, as it may fail with a segmentation fault.
+ if arrow_table.num_rows == 0:
+ # For empty tables, create empty Series to preserve dtypes
+ column_data = (
+ pd.Series([], name=temp_col_names[i], dtype="object") for i in
range(len(schema.fields))
+ )
+ else:
+ # For non-empty tables, convert arrow columns directly
+ column_data = (arrow_col.to_pandas(**pandas_options) for arrow_col in
arrow_table.columns)
+
+ # Apply Spark-specific type converters to each column
+ pdf = pd.concat(
+ objs=cast(
+ Sequence[pd.Series],
+ (
+ _create_converter_to_pandas(
+ field.dataType,
+ field.nullable,
+ timezone=timezone,
+ struct_in_pandas=struct_handling_mode,
+
error_on_duplicated_field_names=error_on_duplicated_field_names,
+ )(series)
+ for series, field in zip(column_data, schema.fields)
+ ),
+ ),
+ axis="columns",
+ )
+
+ # Restore original column names (including duplicates)
+ pdf.columns = schema.names
+
+ return pdf
+
+
class PandasConversionMixin:
"""
Mix-in for the conversion from Spark to pandas and PyArrow. Currently, only
@@ -128,68 +244,29 @@ class PandasConversionMixin:
try:
import pyarrow as pa
- self_destruct = arrowPySparkSelfDestructEnabled == "true"
- batches =
self._collect_as_arrow(split_batches=self_destruct)
+ batches = self._collect_as_arrow(
+ split_batches=arrowPySparkSelfDestructEnabled == "true"
+ )
- # Rename columns to avoid duplicated column names.
- temp_col_names = [f"col_{i}" for i in
range(len(self.columns))]
if len(batches) > 0:
- table =
pa.Table.from_batches(batches).rename_columns(temp_col_names)
+ table = pa.Table.from_batches(batches)
else:
# empty dataset
- table =
arrow_schema.empty_table().rename_columns(temp_col_names)
+ table = arrow_schema.empty_table()
# Ensure only the table has a reference to the batches, so
that
# self_destruct (if enabled) is effective
del batches
- # Pandas DataFrame created from PyArrow uses
datetime64[ns] for date type
- # values, but we should use datetime.date to match the
behavior with when
- # Arrow optimization is disabled.
- pandas_options = {
- "date_as_object": True,
- "coerce_temporal_nanoseconds": True,
- }
- if self_destruct:
- # Configure PyArrow to use as little memory as
possible:
- # self_destruct - free columns as they are converted
- # split_blocks - create a separate Pandas block for
each column
- # use_threads - convert one column at a time
- pandas_options.update(
- {
- "self_destruct": True,
- "split_blocks": True,
- "use_threads": False,
- }
- )
-
- if len(self.columns) > 0:
- timezone = sessionLocalTimeZone
- struct_in_pandas = pandasStructHandlingMode
-
- error_on_duplicated_field_names = False
- if struct_in_pandas == "legacy":
- error_on_duplicated_field_names = True
- struct_in_pandas = "dict"
-
- pdf = pd.concat(
- [
- _create_converter_to_pandas(
- field.dataType,
- field.nullable,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
-
error_on_duplicated_field_names=error_on_duplicated_field_names,
- )(arrow_col.to_pandas(**pandas_options))
- for arrow_col, field in zip(table.columns,
self.schema.fields)
- ],
- axis="columns",
- )
- else:
- # empty columns
- pdf = table.to_pandas(**pandas_options)
+ pdf = _convert_arrow_table_to_pandas(
+ arrow_table=table,
+ schema=self.schema,
+ timezone=sessionLocalTimeZone,
+ struct_handling_mode=pandasStructHandlingMode,
+ date_as_object=True,
+ self_destruct=arrowPySparkSelfDestructEnabled ==
"true",
+ )
- pdf.columns = self.columns
return pdf
except Exception as e:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]