This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 1df6db27 fix: Inconsistent schemas when converting to pyarrow (#1315)
1df6db27 is described below
commit 1df6db27d95d99ddb51136a60abd05a33ce375ad
Author: Nuno Faria <[email protected]>
AuthorDate: Mon Jan 5 15:02:38 2026 +0000
fix: Inconsistent schemas when converting to pyarrow (#1315)
* Fix inconsistent schemas when converting to pyarrow
* Add extra tests
* Change deprecated type
---------
Co-authored-by: Tim Saucer <[email protected]>
---
python/tests/test_dataframe.py | 47 ++++++++++++++++++++++++++++++++++++++++++
src/dataframe.rs | 11 ++++++++--
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index b68493b6..f481f31f 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -1898,6 +1898,53 @@ def test_to_arrow_table(df):
assert set(pyarrow_table.column_names) == {"a", "b", "c"}
+def test_parquet_non_null_column_to_pyarrow(ctx, tmp_path):
+ path = tmp_path.joinpath("t.parquet")
+
+ ctx.sql("create table t_(a int not null)").collect()
+ ctx.sql("insert into t_ values (1), (2), (3)").collect()
+ ctx.sql(f"copy (select * from t_) to '{path}'").collect()
+
+ ctx.register_parquet("t", path)
+ pyarrow_table = ctx.sql("select max(a) as m from t").to_arrow_table()
+ assert pyarrow_table.to_pydict() == {"m": [3]}
+
+
+def test_parquet_empty_batch_to_pyarrow(ctx, tmp_path):
+ path = tmp_path.joinpath("t.parquet")
+
+ ctx.sql("create table t_(a int not null)").collect()
+ ctx.sql("insert into t_ values (1), (2), (3)").collect()
+ ctx.sql(f"copy (select * from t_) to '{path}'").collect()
+
+ ctx.register_parquet("t", path)
+ pyarrow_table = ctx.sql("select * from t limit 0").to_arrow_table()
+ assert pyarrow_table.schema == pa.schema(
+ [
+ pa.field("a", pa.int32(), nullable=False),
+ ]
+ )
+
+
+def test_parquet_null_aggregation_to_pyarrow(ctx, tmp_path):
+ path = tmp_path.joinpath("t.parquet")
+
+ ctx.sql("create table t_(a int not null)").collect()
+ ctx.sql("insert into t_ values (1), (2), (3)").collect()
+ ctx.sql(f"copy (select * from t_) to '{path}'").collect()
+
+ ctx.register_parquet("t", path)
+ pyarrow_table = ctx.sql(
+ "select max(a) as m from (select * from t where a < 0)"
+ ).to_arrow_table()
+ assert pyarrow_table.to_pydict() == {"m": [None]}
+ assert pyarrow_table.schema == pa.schema(
+ [
+ pa.field("m", pa.int32(), nullable=True),
+ ]
+ )
+
+
def test_execute_stream(df):
stream = df.execute_stream()
assert all(batch is not None for batch in stream)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 7bd601de..d920df71 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -1044,11 +1044,18 @@ impl PyDataFrame {
/// Collect the batches and pass to Arrow Table
fn to_arrow_table(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let batches = self.collect(py)?.into_pyobject(py)?;
- let schema = self.schema().into_pyobject(py)?;
+
+ // only use the DataFrame's schema if there are no batches, otherwise
let the schema be
+ // determined from the batches (avoids some inconsistencies with
nullable columns)
+ let args = if batches.len()? == 0 {
+ let schema = self.schema().into_pyobject(py)?;
+ PyTuple::new(py, &[batches, schema])?
+ } else {
+ PyTuple::new(py, &[batches])?
+ };
// Instantiate pyarrow Table object and use its from_batches method
let table_class = py.import("pyarrow")?.getattr("Table")?;
- let args = PyTuple::new(py, &[batches, schema])?;
let table: Py<PyAny> = table_class.call_method1("from_batches",
args)?.into();
Ok(table)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]