This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c35130947c [SPARK-55350][PYTHON][CONNECT] Fix row count loss when 
creating DataFrame from pandas with 0 columns
73c35130947c is described below

commit 73c35130947cdbbf54482e8815335e5e0482c538
Author: Yicong-Huang <[email protected]>
AuthorDate: Wed Feb 4 17:50:57 2026 -0800

    [SPARK-55350][PYTHON][CONNECT] Fix row count loss when creating DataFrame 
from pandas with 0 columns
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the row count loss issue when creating a Spark DataFrame from 
a pandas DataFrame with 0 columns in **Spark Connect**.
    
    The issue occurs due to two PyArrow limitations:
    1. `pa.RecordBatch.from_arrays([], [])` loses row count information
    2. `pa.Table.cast()` on a 0-column table resets the row count to 0
    
    **Changes:**
    1. Handle 0-column pandas DataFrames separately using 
`pa.Table.from_struct_array()` to preserve row count
    2. Skip the `cast()` operation for 0-column tables as it loses row count
    
    ### Why are the changes needed?
    
    Before this fix:
    ```python
    import pandas as pd
    from pyspark.sql.types import StructType
    
    pdf = pd.DataFrame(index=range(10))  # 10 rows, 0 columns
    df = spark.createDataFrame(pdf, schema=StructType([]))
    df.count()  # Returns 0 (wrong!)
    ```
    
    After this fix:
    ```python
    df.count()  # Returns 10 (correct!)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Creating a DataFrame from a pandas DataFrame with 0 columns now 
correctly preserves the row count in Spark Connect.
    
    ### How was this patch tested?
    
    Added unit test `test_from_pandas_dataframe_with_zero_columns` in 
`test_connect_creation.py`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54144 from 
Yicong-Huang/SPARK-55350/fix/arrow-zero-columns-row-count.
    
    Lead-authored-by: Yicong-Huang 
<[email protected]>
    Co-authored-by: Yicong Huang 
<[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 python/pyspark/sql/connect/session.py              | 32 +++++++++++++---------
 .../sql/tests/connect/test_connect_creation.py     | 13 +++++++++
 2 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index e8393479ff26..572be49dd307 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -621,22 +621,28 @@ class SparkSession:
 
             safecheck = 
configs["spark.sql.execution.pandas.convertToArrowArraySafely"]
 
-            _table = pa.Table.from_batches(
-                [
-                    create_arrow_batch_from_pandas(
-                        [(c, st) for (_, c), st in zip(data.items(), 
spark_types)],
-                        timezone=cast(str, timezone),
-                        safecheck=safecheck == "true",
-                        prefers_large_types=prefers_large_types,
-                    )
-                ]
-            )
+            # Handle the 0-column case separately to preserve row count.
+            if len(data.columns) == 0:
+                _table = pa.Table.from_struct_array(pa.array([{}] * len(data), 
type=pa.struct([])))
+            else:
+                _table = pa.Table.from_batches(
+                    [
+                        create_arrow_batch_from_pandas(
+                            [(c, st) for (_, c), st in zip(data.items(), 
spark_types)],
+                            timezone=cast(str, timezone),
+                            safecheck=safecheck == "true",
+                            prefers_large_types=prefers_large_types,
+                        )
+                    ]
+                )
 
             if isinstance(schema, StructType):
                 assert arrow_schema is not None
-                _table = _table.rename_columns(
-                    cast(StructType, _deduplicate_field_names(schema)).names
-                ).cast(arrow_schema)
+                # Skip cast for 0-column tables as it loses row count
+                if len(schema.fields) > 0:
+                    _table = _table.rename_columns(
+                        cast(StructType, 
_deduplicate_field_names(schema)).names
+                    ).cast(arrow_schema)
 
         elif isinstance(data, pa.Table):
             # If no schema supplied by user then get the names of columns only
diff --git a/python/pyspark/sql/tests/connect/test_connect_creation.py 
b/python/pyspark/sql/tests/connect/test_connect_creation.py
index e1f8323f1473..539d555ec63d 100644
--- a/python/pyspark/sql/tests/connect/test_connect_creation.py
+++ b/python/pyspark/sql/tests/connect/test_connect_creation.py
@@ -69,6 +69,19 @@ class SparkConnectCreationTests(ReusedMixedTestCase, 
PandasOnSparkTestUtils):
                 messageParameters={},
             )
 
+    def test_from_pandas_dataframe_with_zero_columns(self):
+        """SPARK-55350: Test that row count is preserved when creating 
DataFrame from
+        pandas with 0 columns but with explicit schema in Spark Connect."""
+        # Create a pandas DataFrame with 5 rows but 0 columns
+        pdf = pd.DataFrame(index=range(5))
+        schema = StructType([])
+
+        cdf = self.connect.createDataFrame(pdf, schema=schema)
+
+        self.assertEqual(cdf.schema, schema)
+        self.assertEqual(cdf.count(), 5)
+        self.assertEqual(len(cdf.collect()), 5)
+
     def test_with_local_ndarray(self):
         """SPARK-41446: Test creating a dataframe using local list"""
         data = np.array([[1, 2, 3, 4], [5, 6, 7, 8]])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to