kylebarron commented on code in PR #1222:
URL:
https://github.com/apache/datafusion-python/pull/1222#discussion_r2319535613
##########
python/datafusion/dataframe.py:
##########
@@ -1098,21 +1102,42 @@ def unnest_columns(self, *columns: str, preserve_nulls:
bool = True) -> DataFram
return DataFrame(self.df.unnest_columns(columns,
preserve_nulls=preserve_nulls))
def __arrow_c_stream__(self, requested_schema: object | None = None) ->
object:
- """Export an Arrow PyCapsule Stream.
+ """Export the DataFrame as an Arrow C Stream.
- This will execute and collect the DataFrame. We will attempt to
respect the
- requested schema, but only trivial transformations will be applied
such as only
- returning the fields listed in the requested schema if their data
types match
- those in the DataFrame.
+ The DataFrame is executed using DataFusion's streaming APIs and
exposed via
+ Arrow's C Stream interface. Record batches are produced incrementally,
so the
+ full result set is never materialized in memory. When
``requested_schema`` is
+ provided, only straightforward projections such as column selection or
+ reordering are applied.
Args:
requested_schema: Attempt to provide the DataFrame using this
schema.
Returns:
- Arrow PyCapsule object.
+ Arrow PyCapsule object representing an ``ArrowArrayStream``.
"""
+ # ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
+ # ``execute_stream_partitioned`` under the hood to stream batches while
+ # preserving the original partition order.
return self.df.__arrow_c_stream__(requested_schema)
+ def __iter__(self) -> Iterator[pa.RecordBatch]:
Review Comment:
I don't really think there's a good rationale for having this method,
especially as it reuses the exact same mechanism as the PyCapsule Interface. If
anything, we might want to have an `__aiter__` method that has a custom async
connection to the DataFusion context.
##########
src/dataframe.rs:
##########
@@ -39,25 +39,55 @@ use datafusion::prelude::*;
use datafusion_ffi::table_provider::FFI_TableProvider;
use futures::{StreamExt, TryStreamExt};
use pyo3::exceptions::PyValueError;
+use pyo3::ffi;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
-use tokio::task::JoinHandle;
use crate::catalog::PyTable;
-use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
+use crate::errors::{py_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
-use crate::record_batch::PyRecordBatchStream;
+use crate::record_batch::{poll_next_batch, PyRecordBatchStream};
use crate::sql::logical::PyLogicalPlan;
use crate::utils::{
- get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value,
validate_pycapsule, wait_for_future,
+ get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_future,
validate_pycapsule,
+ wait_for_future,
};
use crate::{
errors::PyDataFusionResult,
expr::{sort_expr::PySortExpr, PyExpr},
};
+#[allow(clippy::manual_c_str_literals)]
+static ARROW_STREAM_NAME: &CStr =
+ unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_array_stream\0") };
Review Comment:
As suggested by the linter, can we just use `c"arrow_array_stream"`?
##########
src/dataframe.rs:
##########
@@ -39,25 +39,55 @@ use datafusion::prelude::*;
use datafusion_ffi::table_provider::FFI_TableProvider;
use futures::{StreamExt, TryStreamExt};
use pyo3::exceptions::PyValueError;
+use pyo3::ffi;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
-use tokio::task::JoinHandle;
use crate::catalog::PyTable;
-use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
+use crate::errors::{py_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
-use crate::record_batch::PyRecordBatchStream;
+use crate::record_batch::{poll_next_batch, PyRecordBatchStream};
use crate::sql::logical::PyLogicalPlan;
use crate::utils::{
- get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value,
validate_pycapsule, wait_for_future,
+ get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_future,
validate_pycapsule,
+ wait_for_future,
};
use crate::{
errors::PyDataFusionResult,
expr::{sort_expr::PySortExpr, PyExpr},
};
+#[allow(clippy::manual_c_str_literals)]
+static ARROW_STREAM_NAME: &CStr =
+ unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_array_stream\0") };
+
+unsafe extern "C" fn drop_stream(capsule: *mut ffi::PyObject) {
+ if capsule.is_null() {
+ return;
+ }
+
+ // When PyArrow imports this capsule it steals the raw stream pointer and
+ // sets the capsule's internal pointer to NULL. In that case
+ // `PyCapsule_IsValid` returns 0 and this destructor must not drop the
+ // stream as ownership has been transferred to PyArrow. If the capsule was
+ // never imported, the pointer remains valid and we are responsible for
+ // freeing the stream here.
+ if ffi::PyCapsule_IsValid(capsule, ARROW_STREAM_NAME.as_ptr()) == 1 {
+ let stream_ptr = ffi::PyCapsule_GetPointer(capsule,
ARROW_STREAM_NAME.as_ptr())
+ as *mut FFI_ArrowArrayStream;
+ if !stream_ptr.is_null() {
+ drop(Box::from_raw(stream_ptr));
+ }
+ }
+
+ // `PyCapsule_GetPointer` sets a Python error on failure. Clear it only
+ // after the stream has been released (or determined to be owned
+ // elsewhere).
+ ffi::PyErr_Clear();
+}
Review Comment:
We shouldn't need to do any of this, according to upstream discussion
https://github.com/apache/arrow-rs/pull/5070#discussion_r1391489292
##########
docs/source/user-guide/dataframe/index.rst:
##########
@@ -145,10 +145,39 @@ To materialize the results of your DataFrame operations:
# Display results
df.show() # Print tabular format to console
-
+
# Count rows
count = df.count()
+PyArrow Streaming
+-----------------
+
+DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
+zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
+Earlier versions eagerly converted the entire DataFrame when exporting to
+PyArrow, which could exhaust memory on large datasets. With streaming, batches
+are produced lazily so you can process arbitrarily large results without
+out-of-memory errors.
+
+.. code-block:: python
+
+ import pyarrow as pa
+
+ # Create a PyArrow RecordBatchReader without materializing all batches
+ reader =
pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
+ for batch in reader:
+ ... # process each batch as it is produced
+
+DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
+lazily so you can loop over results directly:
+
+.. code-block:: python
+
+ for batch in df:
+ ... # process each batch as it is produced
Review Comment:
Because the user can iterate over the stream accessed by the target library,
I don't think we should define our own custom integration here, and if we do,
then the yielded object should not be a pyarrow RecordBatch, but rather an
opaque, minimal Python class that just exposes `__arrow_c_array__` so that the
user can choose what Arrow library they want to use to work with the batch.
##########
src/dataframe.rs:
##########
@@ -354,6 +384,63 @@ impl PyDataFrame {
}
}
+/// Synchronous wrapper around partitioned [`SendableRecordBatchStream`]s used
+/// for the `__arrow_c_stream__` implementation.
+///
+/// It drains each partition's stream sequentially, yielding record batches in
+/// their original partition order. When a `projection` is set, each batch is
+/// converted via `record_batch_into_schema` to apply schema changes per batch.
+struct PartitionedDataFrameStreamReader {
+ streams: Vec<SendableRecordBatchStream>,
+ schema: SchemaRef,
+ projection: Option<SchemaRef>,
+ current: usize,
+}
+
+impl Iterator for PartitionedDataFrameStreamReader {
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ while self.current < self.streams.len() {
+ let stream = &mut self.streams[self.current];
+ let fut = poll_next_batch(stream);
+ let result = Python::with_gil(|py| wait_for_future(py, fut));
+
+ match result {
+ Ok(Ok(Some(batch))) => {
+ let batch = if let Some(ref schema) = self.projection {
+ match record_batch_into_schema(batch, schema.as_ref())
{
+ Ok(b) => b,
+ Err(e) => return Some(Err(e)),
+ }
+ } else {
+ batch
+ };
+ return Some(Ok(batch));
+ }
+ Ok(Ok(None)) => {
+ self.current += 1;
+ continue;
+ }
+ Ok(Err(e)) => {
+ return Some(Err(ArrowError::ExternalError(Box::new(e))));
+ }
+ Err(e) => {
+ return Some(Err(ArrowError::ExternalError(Box::new(e))));
+ }
+ }
+ }
+
+ None
+ }
+}
+
+impl RecordBatchReader for PartitionedDataFrameStreamReader {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
#[pymethods]
impl PyDataFrame {
Review Comment:
Essentially this changes the `DataFrame` construct to always be "lazy"?
Previously a `DataFrame` was always materialized in memory, whereas now it's
just a representation of future batches?
##########
docs/source/user-guide/dataframe/index.rst:
##########
@@ -145,10 +145,39 @@ To materialize the results of your DataFrame operations:
# Display results
df.show() # Print tabular format to console
-
+
# Count rows
count = df.count()
+PyArrow Streaming
+-----------------
+
+DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
+zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
+Earlier versions eagerly converted the entire DataFrame when exporting to
+PyArrow, which could exhaust memory on large datasets. With streaming, batches
+are produced lazily so you can process arbitrarily large results without
+out-of-memory errors.
+
+.. code-block:: python
+
+ import pyarrow as pa
+
+ # Create a PyArrow RecordBatchReader without materializing all batches
+ reader =
pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
Review Comment:
```suggestion
reader = pa.RecordBatchReader.from_stream(df)
```
##########
src/dataframe.rs:
##########
@@ -889,44 +979,55 @@ impl PyDataFrame {
let desired_schema = Schema::try_from(schema_ptr)?;
schema = project_schema(schema, desired_schema)?;
-
- batches = batches
- .into_iter()
- .map(|record_batch| record_batch_into_schema(record_batch,
&schema))
- .collect::<Result<Vec<RecordBatch>, ArrowError>>()?;
+ projection = Some(Arc::new(schema.clone()));
}
- let batches_wrapped = batches.into_iter().map(Ok);
+ let schema_ref = Arc::new(schema.clone());
- let reader = RecordBatchIterator::new(batches_wrapped,
Arc::new(schema));
+ let reader = PartitionedDataFrameStreamReader {
+ streams,
+ schema: schema_ref,
+ projection,
+ current: 0,
+ };
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
- let ffi_stream = FFI_ArrowArrayStream::new(reader);
- let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
- PyCapsule::new(py, ffi_stream,
Some(stream_capsule_name)).map_err(PyDataFusionError::from)
+ let stream = Box::new(FFI_ArrowArrayStream::new(reader));
Review Comment:
If you have an `FFI_ArrowArrayStream` you should be able to just pass that
to `PyCapsule::new` without touching any unsafe:
https://github.com/kylebarron/arro3/blob/cb2453bf022d0d8704e56e81a324ab5a772e0247/pyo3-arrow/src/ffi/to_python/utils.rs#L93-L94
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]