kosiew commented on code in PR #1222: URL: https://github.com/apache/datafusion-python/pull/1222#discussion_r2346250200
########## docs/source/user-guide/dataframe/index.rst: ########## @@ -145,10 +145,90 @@ To materialize the results of your DataFrame operations: # Display results df.show() # Print tabular format to console - + # Count rows count = df.count() +PyArrow Streaming +----------------- + +DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling +zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_. +Earlier versions eagerly converted the entire DataFrame when exporting to +PyArrow, which could exhaust memory on large datasets. With streaming, batches +are produced lazily so you can process arbitrarily large results without +out-of-memory errors. + +.. code-block:: python + + import pyarrow as pa + + # Create a PyArrow RecordBatchReader without materializing all batches + reader = pa.RecordBatchReader.from_stream(df) + for batch in reader: + ... # process each batch as it is produced + +DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` +objects lazily so you can loop over results directly without importing +PyArrow: + +.. code-block:: python + + for batch in df: + ... # each batch is a ``datafusion.RecordBatch`` + +Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow +table without collecting everything eagerly: + +.. code-block:: python + + import pyarrow as pa + table = pa.Table.from_batches(b.to_pyarrow() for b in df) + +Asynchronous iteration is supported as well, allowing integration with +``asyncio`` event loops: + +.. code-block:: python + + async for batch in df: + ... # process each batch as it is produced + +To work with the stream directly, use +``execute_stream()``, which returns a +:class:`~datafusion.RecordBatchStream`: + +.. code-block:: python + + stream = df.execute_stream() + for batch in stream: + ... + +Execute as Stream +^^^^^^^^^^^^^^^^^ + +For finer control over streaming execution, use +:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a +:py:class:`pyarrow.RecordBatchReader`: + +.. code-block:: python + + reader = df.execute_stream() + for batch in reader: + ... # process each batch as it is produced + +When partition boundaries are important, +:py:meth:`~datafusion.DataFrame.execute_stream_partitioned` +returns an iterable of :py:class:`pyarrow.RecordBatchReader` objects, one per +partition: + +.. code-block:: python + + for stream in df.execute_stream_partitioned(): + for batch in stream: Review Comment: Good question! I added a concurrent iteration example in the same document to clarify this. ``` To process partitions concurrently, first collect the streams into a list and then poll each one in a separate ``asyncio`` task: .. code-block:: python import asyncio async def consume(stream): async for batch in stream: ... streams = list(df.execute_stream_partitioned()) await asyncio.gather(*(consume(s) for s in streams)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org