kylebarron commented on code in PR #1222: URL: https://github.com/apache/datafusion-python/pull/1222#discussion_r2337355501
########## Cargo.toml: ########## Review Comment: Are most of the changes here just formatting? Did you just add `cstr`? ########## python/tests/test_io.py: ########## @@ -92,3 +96,38 @@ def test_read_avro(): path = Path.cwd() / "testing/data/avro/alltypes_plain.avro" avro_df = read_avro(path=path) assert avro_df is not None + + +def test_arrow_c_stream_large_dataset(ctx): + """DataFrame.__arrow_c_stream__ yields batches incrementally. + + This test constructs a DataFrame that would be far larger than available + memory if materialized. The ``__arrow_c_stream__`` method should expose a + stream of record batches without collecting the full dataset, so reading a + handful of batches should not exhaust process memory. + """ + # Create a very large DataFrame using range; this would be terabytes if collected + df = range_table(ctx, 0, 1 << 40) + + reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__()) + + # Track RSS before consuming batches + psutil = pytest.importorskip("psutil") + process = psutil.Process() + start_rss = process.memory_info().rss + + for _ in range(5): + batch = reader.read_next_batch() + assert batch is not None + assert len(batch) > 0 + current_rss = process.memory_info().rss + # Ensure memory usage hasn't grown substantially (>50MB) + assert current_rss - start_rss < 50 * 1024 * 1024 + + +def test_table_from_batches_stream(ctx, fail_collect): + df = range_table(ctx, 0, 10) + + table = pa.Table.from_batches(batch.to_pyarrow() for batch in df) Review Comment: Are you intentionally testing a Python iterator here instead of using `__arrow_c_stream__`? If you call ```py pa.table(df) ``` that will use the C Stream and materialize the data on the pyarrow side, and it'll be more efficient because it keeps everything as a C pointer instead of having to go through Python. ########## python/tests/test_io.py: ########## @@ -92,3 +96,38 @@ def test_read_avro(): path = Path.cwd() / "testing/data/avro/alltypes_plain.avro" avro_df = read_avro(path=path) assert avro_df is not None + + +def test_arrow_c_stream_large_dataset(ctx): + """DataFrame.__arrow_c_stream__ yields batches incrementally. + + This test constructs a DataFrame that would be far larger than available + memory if materialized. The ``__arrow_c_stream__`` method should expose a + stream of record batches without collecting the full dataset, so reading a + handful of batches should not exhaust process memory. + """ + # Create a very large DataFrame using range; this would be terabytes if collected + df = range_table(ctx, 0, 1 << 40) + + reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__()) Review Comment: ```suggestion reader = pa.RecordBatchReader.from_stream(df) ``` ########## python/datafusion/record_batch.py: ########## @@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch: """Convert to :py:class:`pa.RecordBatch`.""" return self.record_batch.to_pyarrow() + def __arrow_c_array__( Review Comment: ๐ ๐ ########## python/datafusion/dataframe.py: ########## @@ -1105,21 +1122,33 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: - """Export an Arrow PyCapsule Stream. + """Export the DataFrame as an Arrow C Stream. - This will execute and collect the DataFrame. We will attempt to respect the - requested schema, but only trivial transformations will be applied such as only - returning the fields listed in the requested schema if their data types match - those in the DataFrame. + The DataFrame is executed using DataFusion's streaming APIs and exposed via Review Comment: Might be good to have a link somewhere in the docstring to https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html ########## python/datafusion/dataframe.py: ########## @@ -1105,21 +1122,33 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: - """Export an Arrow PyCapsule Stream. + """Export the DataFrame as an Arrow C Stream. - This will execute and collect the DataFrame. We will attempt to respect the - requested schema, but only trivial transformations will be applied such as only - returning the fields listed in the requested schema if their data types match - those in the DataFrame. + The DataFrame is executed using DataFusion's streaming APIs and exposed via + Arrow's C Stream interface. Record batches are produced incrementally, so the + full result set is never materialized in memory. When ``requested_schema`` is + provided, only straightforward projections such as column selection or + reordering are applied. Args: requested_schema: Attempt to provide the DataFrame using this schema. Returns: - Arrow PyCapsule object. + Arrow PyCapsule object representing an ``ArrowArrayStream``. """ + # ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages + # ``execute_stream_partitioned`` under the hood to stream batches while + # preserving the original partition order. return self.df.__arrow_c_stream__(requested_schema) + def __iter__(self) -> Iterator[RecordBatch]: + """Return an iterator over this DataFrame's record batches.""" + return iter(self.execute_stream()) + + def __aiter__(self) -> AsyncIterator[RecordBatch]: + """Return an async iterator over this DataFrame's record batches.""" + return self.execute_stream().__aiter__() Review Comment: Nit, use the same approach for both of these; either use `iter` and `aiter` or call the dunder in each case ```suggestion return aiter(self.execute_stream()) ``` ########## docs/source/user-guide/dataframe/index.rst: ########## @@ -145,10 +145,90 @@ To materialize the results of your DataFrame operations: # Display results df.show() # Print tabular format to console - + # Count rows count = df.count() +PyArrow Streaming Review Comment: The heading here is specific to PyArrow, but I think it would be good to provide a distinction here. Maybe "Zero-copy Streaming to other Arrow implementations"? Or something like that? Then we can have a sub-section dedicated to pyarrow, but also explain that it works with any arrow-based python impl. ########## src/record_batch.rs: ########## @@ -84,15 +84,21 @@ impl PyRecordBatchStream { } } +/// Polls the next batch from a `SendableRecordBatchStream`, converting the `Option<Result<_>>` form. +pub(crate) async fn poll_next_batch( + stream: &mut SendableRecordBatchStream, +) -> datafusion::error::Result<Option<RecordBatch>> { + stream.next().await.transpose() +} + async fn next_stream( stream: Arc<Mutex<SendableRecordBatchStream>>, sync: bool, ) -> PyResult<PyRecordBatch> { let mut stream = stream.lock().await; - match stream.next().await { - Some(Ok(batch)) => Ok(batch.into()), - Some(Err(e)) => Err(PyDataFusionError::from(e))?, - None => { + match poll_next_batch(&mut stream).await { Review Comment: I'm not a tokio expert; how does this materially change? Did you have to make a new function here? Could you have just used ```rs match stream.next().await.transpose() ``` ########## python/datafusion/dataframe.py: ########## @@ -1042,6 +1047,18 @@ def execute_stream_partitioned(self) -> list[RecordBatchStream]: streams = self.df.execute_stream_partitioned() return [RecordBatchStream(rbs) for rbs in streams] + @deprecated("Use execute_stream() instead") + def to_record_batch_stream(self) -> RecordBatchStream: Review Comment: Is this a new method? Why are we creating a new method that's immediately deprecated? ########## docs/source/user-guide/dataframe/index.rst: ########## @@ -145,10 +145,90 @@ To materialize the results of your DataFrame operations: # Display results df.show() # Print tabular format to console - + # Count rows count = df.count() +PyArrow Streaming +----------------- + +DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling +zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_. +Earlier versions eagerly converted the entire DataFrame when exporting to +PyArrow, which could exhaust memory on large datasets. With streaming, batches +are produced lazily so you can process arbitrarily large results without +out-of-memory errors. + +.. code-block:: python + + import pyarrow as pa + + # Create a PyArrow RecordBatchReader without materializing all batches + reader = pa.RecordBatchReader.from_stream(df) + for batch in reader: + ... # process each batch as it is produced + +DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` +objects lazily so you can loop over results directly without importing +PyArrow: + +.. code-block:: python + + for batch in df: + ... # each batch is a ``datafusion.RecordBatch`` + +Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow +table without collecting everything eagerly: + +.. code-block:: python + + import pyarrow as pa + table = pa.Table.from_batches(b.to_pyarrow() for b in df) + +Asynchronous iteration is supported as well, allowing integration with +``asyncio`` event loops: + +.. code-block:: python + + async for batch in df: + ... # process each batch as it is produced + +To work with the stream directly, use +``execute_stream()``, which returns a +:class:`~datafusion.RecordBatchStream`: + +.. code-block:: python + + stream = df.execute_stream() + for batch in stream: + ... + +Execute as Stream +^^^^^^^^^^^^^^^^^ + +For finer control over streaming execution, use +:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a +:py:class:`pyarrow.RecordBatchReader`: Review Comment: `execute_stream` returns a _`pyarrow`_ `RecordBatchReader`? I thought it returned our own DataFusion-specific stream wrapper? I.e. `datafusion.RecordBatchStream` ########## docs/source/user-guide/dataframe/index.rst: ########## @@ -145,10 +145,90 @@ To materialize the results of your DataFrame operations: # Display results df.show() # Print tabular format to console - + # Count rows count = df.count() +PyArrow Streaming +----------------- + +DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling +zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_. +Earlier versions eagerly converted the entire DataFrame when exporting to +PyArrow, which could exhaust memory on large datasets. With streaming, batches +are produced lazily so you can process arbitrarily large results without +out-of-memory errors. + +.. code-block:: python + + import pyarrow as pa + + # Create a PyArrow RecordBatchReader without materializing all batches + reader = pa.RecordBatchReader.from_stream(df) + for batch in reader: + ... # process each batch as it is produced + +DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` +objects lazily so you can loop over results directly without importing +PyArrow: + +.. code-block:: python + + for batch in df: + ... # each batch is a ``datafusion.RecordBatch`` + +Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow +table without collecting everything eagerly: + +.. code-block:: python + + import pyarrow as pa + table = pa.Table.from_batches(b.to_pyarrow() for b in df) Review Comment: I think if we provide an example of collecting as a table, we should suggest `pa.table(df)` instead, which will be more efficient because it doesn't go through Python. And > without collecting everything eagerly This isn't a good example for that, because `pa.Table` _necessarily_ materializes everything ########## docs/source/user-guide/dataframe/index.rst: ########## @@ -145,10 +145,90 @@ To materialize the results of your DataFrame operations: # Display results df.show() # Print tabular format to console - + # Count rows count = df.count() +PyArrow Streaming +----------------- + +DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling +zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_. +Earlier versions eagerly converted the entire DataFrame when exporting to +PyArrow, which could exhaust memory on large datasets. With streaming, batches +are produced lazily so you can process arbitrarily large results without +out-of-memory errors. + +.. code-block:: python + + import pyarrow as pa + + # Create a PyArrow RecordBatchReader without materializing all batches + reader = pa.RecordBatchReader.from_stream(df) + for batch in reader: + ... # process each batch as it is produced + +DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` +objects lazily so you can loop over results directly without importing +PyArrow: + +.. code-block:: python + + for batch in df: + ... # each batch is a ``datafusion.RecordBatch`` + +Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow +table without collecting everything eagerly: + +.. code-block:: python + + import pyarrow as pa + table = pa.Table.from_batches(b.to_pyarrow() for b in df) + +Asynchronous iteration is supported as well, allowing integration with +``asyncio`` event loops: + +.. code-block:: python + + async for batch in df: + ... # process each batch as it is produced + +To work with the stream directly, use +``execute_stream()``, which returns a +:class:`~datafusion.RecordBatchStream`: + +.. code-block:: python + + stream = df.execute_stream() + for batch in stream: + ... + +Execute as Stream +^^^^^^^^^^^^^^^^^ + +For finer control over streaming execution, use +:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a +:py:class:`pyarrow.RecordBatchReader`: + +.. code-block:: python + + reader = df.execute_stream() + for batch in reader: + ... # process each batch as it is produced + +When partition boundaries are important, +:py:meth:`~datafusion.DataFrame.execute_stream_partitioned` +returns an iterable of :py:class:`pyarrow.RecordBatchReader` objects, one per +partition: + +.. code-block:: python + + for stream in df.execute_stream_partitioned(): + for batch in stream: Review Comment: Interesting. Can these streams be polled concurrently? Can you do ```py streams = list(df.execute_stream_partitioned()) ``` and then concurrently iterate over all the streams, yielding whatever batch comes in first? I suppose that would just do in Python what `execute_stream` is doing in Rust? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org