kosiew opened a new pull request, #1222:
URL: https://github.com/apache/datafusion-python/pull/1222
## Which issue does this PR close?
* Closes #1206
> does ArrowStreamExportable to load the full data into memory or it is a
recordbatch reader as I am getting OOM when used in smaller VM
## Rationale for this change
Exporting a DataFrame to PyArrow previously required materializing
(collecting) the entire result set. For large datasets this caused high memory
usage and could lead to Out Of Memory (OOM) failures. The goal of this PR is to
provide a zero‑copy, incremental export path into PyArrow and to make
`DataFrame` objects iterable so consumers can process record batches lazily. In
addition, the PR improves Python/Rust integration for running async DataFusion
operations while respecting Python signal handling (e.g. `KeyboardInterrupt`).
These changes make streaming the default, memory‑efficient path for Python
users exporting to Arrow and provide robust behavior when interrupted.
## What changes are included in this PR?
**High level**
* Add first-class Arrow C stream support for `DataFrame.__arrow_c_stream__`,
returning an ArrowArrayStream PyCapsule that PyArrow can import without copying
or full materialization.
* Make `DataFrame` iterable in Python so `for batch in df:` yields
`pyarrow.RecordBatch` lazily.
* Implement a partition-aware synchronous `RecordBatchReader` wrapper in
Rust that drains partitioned `SendableRecordBatchStream`s while preserving
original partition order and applying an optional projection per-batch.
* Add robust PyCapsule lifecycle handling that transfers ownership to
PyArrow when it imports the capsule (avoid double-free) and clears errors set
by `PyCapsule_GetPointer` safely.
* Introduce `spawn_future` utility to run futures on the global Tokio
runtime and wait on them while correctly translating join errors and preserving
Python signal handling.
* Replace previous `collect`/join-handle patterns in a few Rust methods with
`spawn_future` to unify behavior.
**Files changed (non-exhaustive, representative):**
* Documentation
* `docs/source/user-guide/dataframe/index.rst` — add `PyArrow Streaming`
section that documents `__arrow_c_stream__` and iterator behavior.
* Python package
* `python/datafusion/dataframe.py` — update docstrings for
`__arrow_c_stream__`, add `__iter__` impl that imports the Arrow capsule and
yields batches lazily.
* `python/tests/*` — many tests added/updated to validate streaming
behavior, ordering, schema projection, schema mismatch errors, interruption,
and memory usage.
* `python/tests/utils.py` — test helper `range_table` for generating very
large ranges in tests.
* `python/tests/test_dataframe_iter_stream.py` — new tests to validate
reader close/release semantics.
* `python/tests/test_io.py` — add `test_arrow_c_stream_large_dataset`
which verifies memory does not grow substantially while reading a handful of
batches.
* `python/tests/conftest.py` — add `fail_collect` fixture that asserts
`collect` is not called in streaming paths.
* Rust extension
* `src/dataframe.rs` — implement `PartitionedDataFrameStreamReader`,
return `ArrowArrayStream` PyCapsule with destructor `drop_stream`, preserve
partition order, support optional projection per batch.
* `src/record_batch.rs` — add `poll_next_batch` helper that converts the
streaming `Option<Result<_>>` into `Result<Option<_>>` for ergonomic handling.
* `src/utils.rs` — add `spawn_future` that runs a given future on the
Tokio runtime and waits while mapping JoinErrors and preserving Python signal
semantics.
* `src/context.rs` — use `spawn_future` for streaming/execution entry
points.
**Behavioral notes:**
* The exported capsule is zero‑copy and PyArrow takes ownership when it
imports the capsule; the destructor checks `PyCapsule_IsValid` to determine
whether ownership was transferred and avoids freeing the stream in that case.
* Schema projection is applied per-record-batch (not via in-memory collect)
so requested schema changes are cheap and incremental.
* Partition order is preserved: batches are drained partition-by-partition
in original order.
* Long-running streaming operations respond correctly to `KeyboardInterrupt`
(tested by injecting `PyThreadState_SetAsyncExc`).
## Are these changes tested?
Yes — the PR adds a comprehensive set of tests focused on streaming
semantics and correctness. Notable tests:
* `test_iter_batches_dataframe` — verifies iteration yields record batches
in order.
* `test_arrow_c_stream_to_table_and_reader` — validates creating a
`pyarrow.Table` from the stream and comparing batches.
* `test_arrow_c_stream_order` — ensures partition and batch ordering is
preserved.
* `test_arrow_c_stream_schema_selection` — verifies schema projection is
respected and returned reader schema matches requested schema.
* `test_arrow_c_stream_schema_mismatch` — ensures an informative error is
raised on schema mismatch during requested projection.
* `test_arrow_c_stream_interrupted` — ensures stream iteration stops
promptly on `KeyboardInterrupt`.
* `test_arrow_c_stream_large_dataset` — constructs a (virtually) huge range
and reads a handful of batches while asserting RSS doesn't increase
substantially (uses `psutil`, skipped if unavailable).
* `test_iter_releases_reader` — monkeypatch-based test that ensures the
underlying reader is closed when iteration stops.
Tests are added to `python/tests/*` and use `pytest.importorskip` where
optional dependencies (`pyarrow.cffi`, `psutil`) are required. The
`fail_collect` fixture checks that `collect` is not called accidentally in
streaming code paths.
## Are there any user-facing changes?
Yes.
* **New:** `DataFrame` objects are iterable in Python: `for batch in df:`
yields `pyarrow.RecordBatch` lazily.
* **New:** `DataFrame.__arrow_c_stream__(requested_schema=None)` now exposes
an ArrowArrayStream PyCapsule that PyArrow can consume zero‑copy.
* **Docs:** Added documentation and examples in the DataFrame user guide
(`PyArrow Streaming`) showing how to import the capsule into a
`pa.RecordBatchReader` and iterate without materializing all batches.
**Compatibility / migration notes**
* This is an additive change and should be backward compatible for existing
users who call `collect()` or `to_pylist()`.
* Code that previously relied on `__arrow_c_stream__` materializing results
(for example implicitly via earlier implementations) should continue to work —
but the streaming path will be used, and tests or mocks that expected
`collect()` to be invoked may need adjustment. To help guard against
regressions, tests in this PR include a `fail_collect` fixture to ensure
streaming paths do not call `collect()`.
## Review notes
Focus review on these high‑leverage areas:
1. **Safety of capsule lifecycle handling** (`src/dataframe.rs`): ensure
`drop_stream` correctly handles both stolen and owned pointers, and that Python
error state is handled safely.
2. **spawn\_future** (`src/utils.rs`) and its usage: validate signal
handling, JoinError mapping, and that error types are converted to Python
exceptions cleanly.
3. **Partitioned streaming reader semantics**
(`PartitionedDataFrameStreamReader` in `src/dataframe.rs`): confirm the
ordering and projection behavior is correct and performant.
4. **Python iterator implementation** (`python/datafusion/dataframe.py`):
ensure the reader is imported and closed properly (context manager usage via
`closing`), and that `for batch in df:` behaves as expected in IPython and
scripts.
5. **Tests**: run the new Python test suite locally (including those that
require optional deps) and inspect `fail_collect` behavior.
Files worth opening first in the review UI:
* `src/dataframe.rs`
* `src/record_batch.rs`
* `src/utils.rs`
* `src/context.rs`
* `python/datafusion/dataframe.py`
* `python/tests/test_dataframe.py`
* `python/tests/test_io.py`
* `python/tests/test_dataframe_iter_stream.py`
* `python/tests/utils.py`
## Release notes / changelog suggestion
* Add a note under "Python bindings" saying: "DataFrame now supports
zero‑copy Arrow streaming and iteration: `DataFrame.__arrow_c_stream__` returns
an ArrowArrayStream PyCapsule and DataFrame objects are iterable (yielding
`pyarrow.RecordBatch`). This enables exporting arbitrarily large results to
PyArrow without materializing them in memory."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]