This is an automated email from the ASF dual-hosted git repository.

Fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 706390fc1 Support `pa.RecordBatchReader` in Table.`{append,overwrite}` 
(#3335)
706390fc1 is described below

commit 706390fc1b75c462bc50da5b11c0bc768e7b213c
Author: Paul Mathew <[email protected]>
AuthorDate: Tue May 19 06:16:22 2026 -0400

    Support `pa.RecordBatchReader` in Table.`{append,overwrite}` (#3335)
    
    # Rationale for this change
    
    Closes #2152, addresses the long-standing memory problem reported in
    #1004 and re-discovered by dlt-hub#3753.
    
    `Table.append(df)` and `Table.overwrite(df)` currently require a fully
    materialised `pa.Table`. For large or unbounded inputs this means
    loading the entire dataset into memory before writing — fatal at any
    non-trivial scale and a recurring complaint going back to #1004 (Aug
    2024). The reference Java implementation has streaming append;
    iceberg-go shipped it in iceberg-go#369 (Apr 2025). Python is the last
    major SDK without it.
    
    This PR adds `pa.RecordBatchReader` as a valid input to
    `Table.append/overwrite` (and `Transaction.append/overwrite`). The
    reader is consumed lazily, microbatched into Parquet files via the new
    `bin_pack_record_batches` helper, and committed in a single snapshot via
    the existing `fast_append` pipeline.
    
    ```python
    reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
    tbl.append(reader)        # ← streams, doesn't materialise
    tbl.overwrite(reader)     # ← also supported
    ```
    
    ## Scope (unpartitioned only)
    
    Streaming into a partitioned table raises `NotImplementedError` pointing
    back to #2152. Partitioned support is genuinely the harder case — it
    needs design discussion around partition cardinality bounds,
    per-partition rolling writers, and idempotency on retry — so I'm
    proposing to land in three reviewable PRs:
    
    1. **This PR** — API + unpartitioned + buffered byte-budget bin-packing.
    2. **PR2 (next)** — switch internals to a rolling `pq.ParquetWriter` +
    `OutputStream.tell()` for constant-memory streaming. No API change.
    Detailed plan below.
    3. **PR3 (later)** — partitioned streaming, after design discussion on
    #2152.
    
    This mirrors iceberg-go#369's staging: ship the unpartitioned API first,
    iterate from there.
    
    ## Implementation
    
    The streaming path reuses the existing `WriteTask` → `write_file` →
    `fast_append` pipeline. The only new primitive is
    `bin_pack_record_batches` (sibling of the existing
    `bin_pack_arrow_table`):
    
    - Accumulates incoming `RecordBatch`es into an in-memory buffer.
    - Flushes when `sum(batch.nbytes) >= write.target-file-size-bytes`.
    - Each flushed buffer becomes one parquet file via the existing
    `write_parquet` task.
    - Schema check (`_check_pyarrow_schema_compatible`) runs against
    `reader.schema` before the snapshot producer opens — schema mismatches
    fail before any data file is written, so no orphans.
    
    ## Acknowledged trade-offs
    
    **Memory**: peak memory is bounded by `N_workers ×
    write.target-file-size-bytes` (default 8 × 512 MiB ≈ 4 GiB), not
    constant. This is materially better than today's "materialise
    everything" but isn't yet "constant memory streaming". PR2 fixes this.
    
    **Byte semantics**: `write.target-file-size-bytes` is currently
    interpreted as **uncompressed in-memory Arrow bytes**
    (`RecordBatch.nbytes` — the bin-packing weight), not compressed on-disk
    Parquet bytes. The resulting files are typically 3-10× smaller than the
    property suggests after zstd / dictionary / RLE encoding. This matches
    the existing `pa.Table` write path (`bin_pack_arrow_table` uses the same
    accounting) — this PR doesn't change pyiceberg's existing semantics, it
    only documents them in the docstrings of both helpers and the
    `Transaction.append/overwrite` `Note:` blocks. PR2 fixes this too.
    
    **Retry**: `pa.RecordBatchReader` is single-pass, so a failed catalog
    commit leaves the reader drained and a naive retry writes zero rows.
    Documented in the `Note:` block — callers needing at-least-once
    semantics should reconstruct the reader on each attempt via a factory
    callable, or use the two-stage `add_files` pattern (whose input is a
    replayable list of paths).
    
    ## PR2 — proposed scope (FYI, not in this PR)
    
    Drop the buffer-and-flush approach and use a rolling `pq.ParquetWriter`
    driven by `OutputStream.tell()` (added in #2998 specifically for this
    kind of use case):
    
    ```python
    # sketch
    writer = pq.ParquetWriter(fos, schema, **kwargs)
    for batch in reader:
        writer.write_batch(batch)
        if fos.tell() >= target_file_size:   # compressed on-disk bytes
            writer.close()
            finalize_data_file(...)
            # open next file
            fos = io.new_output(next_path).create(overwrite=True)
            writer = pq.ParquetWriter(fos, schema, **kwargs)
    writer.close()
    ```
    
    What this delivers:
    
    - **Constant memory**: `O(1 batch)` per worker (~10s of MB) regardless
    of `target_file_size`. The 4 GiB peak in this PR drops to ~50-100 MB.
    - **Spec-correct byte semantics**: `write.target-file-size-bytes`
    becomes actual on-disk compressed bytes, matching the Java/Spark/Flink
    writers and the spec.
    - **No public API change**: same `tx.append(reader)` /
    `tx.overwrite(reader)` — internals only.
    
    Open design questions for PR2 (will surface on the issue thread before
    coding):
    
    - **Parallelism**: a single rolling writer is serial. Either accept that
    for streaming (memory-vs-throughput trade), or add a hybrid (N rolling
    writers fed via a queue) and pick a default that matches today's
    `executor.map(write_parquet, tasks)` parallelism.
    - **Backwards compat**: switching `bin_pack_arrow_table` to the same
    rolling-writer mechanism would also tighten the `pa.Table` path's byte
    semantics. That changes file-size characteristics for every existing
    pyiceberg writer. Probably worth a separate change with a deprecation
    note, or a feature flag.
    - **`add_files` interaction**: rolling writes produce data files we know
    about directly; we shouldn't go through the parquet-footer round-trip in
    `_dataframe_to_data_files`. Means a small refactor in the streaming-only
    path.
    
    ## Are these changes tested?
    
    Yes, comprehensively at four layers.
    
    **1. Unit tests** (`tests/io/test_pyarrow.py`) — 4 new tests for
    `bin_pack_record_batches` covering single-bin, microbatched, empty
    input, and lazy generator consumption.
    
    **2. End-to-end behaviour tests**
    (`tests/catalog/test_catalog_behaviors.py`) — 8 new tests parametrised
    across all three in-process catalog backends (`memory`, `sql`,
    `sql_without_rowcount`) → 24 test runs covering append, overwrite,
    microbatch verification (multiple files in one snapshot), empty reader,
    partitioned-table-raises, invalid-input-rejected,
    reader-consumed-exactly-once, and schema-mismatch-writes-no-files.
    
    **3. Integration tests**
    (`tests/integration/test_writes/test_writes.py`) — 6 new Spark-readback
    tests for v1 + v2 format versions covering append, overwrite, and
    multi-file microbatch. Proves Spark can read tables written via the
    streaming path against the docker-compose stack.
    
    **4. Smoke test on a real production stack** — verified end-to-end
    against AWS Glue + S3 in our staging account: 100 k-row streaming append
    in 17 s, 20-file microbatched commit, Athena read-back (`COUNT(*)` and
    `MAX(id)` matched the input exactly), schema-mismatch rejection leaving
    no orphan files.
    
    Full unit suite: 3 647 passed. Full integration suite: 122 passed, 1
    skipped.
    
    ## Are there any user-facing changes?
    
    Yes, intentionally:
    
    - `Transaction.append(df)`, `Transaction.overwrite(df)`,
    `Table.append(df)`, `Table.overwrite(df)` accept `pa.Table |
    pa.RecordBatchReader`.
    - The `ValueError` raised on bad input changes from `"Expected PyArrow
    table, got: ..."` to `"Expected pa.Table or pa.RecordBatchReader, got:
    ..."`. Updated `test_invalid_arguments` accordingly.
    - New module-level helper `bin_pack_record_batches` in
    `pyiceberg.io.pyarrow` (sibling of `bin_pack_arrow_table`).
    - `bin_pack_arrow_table` gained its first docstring, documenting the
    existing uncompressed-Arrow-bytes accounting.
    - Docs: new "Streaming writes from a RecordBatchReader" subsection in
    `mkdocs/docs/api.md`.
    - Docstrings on `Transaction.append/overwrite` document retry semantics
    and the byte-semantics caveat.
    
    ## Related
    
    - Closes #2152
    - Addresses #1004 (closed by reporter without a fix)
    - Reference implementation: iceberg-go#369
    - Downstream consumer hitting the same problem: dlt-hub/dlt#3753
    (independent rediscovery of the same approach)
    - Builds on the maintainer-blessed pattern from #1742's review
    (`_dataframe_to_data_files` + `fast_append.append_data_file()`, no
    separate `write_parquet` API)
    - Companion fix (already merged separately): test-state isolation in
    `test_write_optional_list`
    - PR2 will build on `OutputStream.tell()` from #2998
    
    ---------
    
    Co-authored-by: Paul Mathew <[email protected]>
---
 mkdocs/docs/api.md                                 |  11 ++
 pyiceberg/io/pyarrow.py                            |  79 +++++++++-
 pyiceberg/table/__init__.py                        | 140 +++++++++++++++---
 tests/catalog/test_catalog_behaviors.py            | 162 +++++++++++++++++++++
 .../test_writes/test_partitioned_writes.py         |   2 +-
 tests/integration/test_writes/test_writes.py       |  88 ++++++++++-
 tests/io/test_pyarrow.py                           |  46 ++++++
 7 files changed, 498 insertions(+), 30 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 29d09e260..ca6995a72 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
     print(f"Buffer contains {len(buf)} rows")
 ```
 
+### Streaming writes from a `RecordBatchReader`
+
+`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` 
directly, which lets you write datasets that don't fit in memory without 
materialising them as a `pa.Table` first. PyIceberg consumes the reader once 
and microbatches it into Parquet files of approximately 
`write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded 
by the target size. All files are committed in a single snapshot.
+
+```python
+reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
+tbl.append(reader)
+```
+
+Streaming writes are currently only supported on **unpartitioned** tables. For 
a partitioned table, materialise the reader as a `pa.Table` first, or follow 
[#2152](https://github.com/apache/iceberg-python/issues/2152) for the 
partitioned support tracked as a follow-up.
+
 To avoid any type inconsistencies during writing, you can convert the Iceberg 
table schema to Arrow:
 
 ```python
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index d4414c7c5..db9a035bd 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2675,6 +2675,18 @@ def write_file(io: FileIO, table_metadata: 
TableMetadata, tasks: Iterator[WriteT
 
 
 def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> 
Iterator[list[pa.RecordBatch]]:
+    """Bin-pack ``tbl`` into groups of RecordBatches, each 
~``target_file_size``.
+
+    Note:
+        ``target_file_size`` is measured in **uncompressed in-memory** Arrow 
bytes
+        (``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk 
Parquet
+        bytes. The resulting Parquet file after compression (zstd by default,
+        plus dictionary/RLE encoding) is typically 3-10× smaller than
+        ``target_file_size``. This is a coarse proxy for the spec-defined
+        ``write.target-file-size-bytes`` and will be tightened to true on-disk
+        bytes once the writer is switched to a rolling-``ParquetWriter`` with
+        ``OutputStream.tell()`` (#2998).
+    """
     from pyiceberg.utils.bin_packing import PackingIterator
 
     avg_row_size_bytes = tbl.nbytes / tbl.num_rows
@@ -2690,6 +2702,41 @@ def bin_pack_arrow_table(tbl: pa.Table, 
target_file_size: int) -> Iterator[list[
     return bin_packed_record_batches
 
 
+def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], 
target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
+    """Microbatch a single-pass stream of RecordBatches into target-sized 
groups.
+
+    Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and
+    holds at most one in-flight buffer in memory, bounded by 
``target_file_size``.
+    Suitable for streaming inputs (``pa.RecordBatchReader``,
+    ``Iterator[pa.RecordBatch]``) where the total size is unknown up front and
+    the caller cannot afford to materialise the full dataset.
+
+    Each yielded list of batches is intended to be written as a single Parquet
+    data file. Because this is single-pass FIFO accumulation (no lookback), the
+    last bin may be smaller than ``target_file_size``.
+
+    Note:
+        ``target_file_size`` is measured in **uncompressed in-memory** Arrow
+        bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
+        The resulting Parquet file after compression is typically 3-10×
+        smaller than ``target_file_size``. Matches the existing
+        :func:`bin_pack_arrow_table` semantics; both will be tightened to true
+        on-disk bytes once the writer is switched to a rolling-
+        ``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
+    """
+    buffer: list[pa.RecordBatch] = []
+    buffer_bytes = 0
+    for batch in batches:
+        buffer.append(batch)
+        buffer_bytes += batch.nbytes
+        if buffer_bytes >= target_file_size:
+            yield buffer
+            buffer = []
+            buffer_bytes = 0
+    if buffer:
+        yield buffer
+
+
 def _check_pyarrow_schema_compatible(
     requested_schema: Schema,
     provided_schema: pa.Schema,
@@ -2809,15 +2856,24 @@ def _get_parquet_writer_kwargs(table_properties: 
Properties) -> dict[str, Any]:
 
 def _dataframe_to_data_files(
     table_metadata: TableMetadata,
-    df: pa.Table,
+    df: pa.Table | pa.RecordBatchReader,
     io: FileIO,
     write_uuid: uuid.UUID | None = None,
     counter: itertools.count[int] | None = None,
 ) -> Iterable[DataFile]:
-    """Convert a PyArrow table into a DataFile.
+    """Convert a PyArrow Table or RecordBatchReader into DataFiles.
+
+    For a ``pa.Table`` the data is materialised in memory and bin-packed into
+    target-sized files (with partition splitting if the table is partitioned).
+
+    For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
+    target-sized files using bounded memory (see 
:func:`bin_pack_record_batches`).
+    Streaming writes are currently only supported on unpartitioned tables;
+    partitioned support is tracked in
+    https://github.com/apache/iceberg-python/issues/2152.
 
     Returns:
-        An iterable that supplies datafiles that represent the table.
+        An iterable that supplies datafiles that represent the input data.
     """
     from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, 
TableProperties, WriteTask
 
@@ -2837,6 +2893,23 @@ def _dataframe_to_data_files(
         format_version=table_metadata.format_version,
     )
 
+    if isinstance(df, pa.RecordBatchReader):
+        if not table_metadata.spec().is_unpartitioned():
+            raise NotImplementedError(
+                "Writing a pa.RecordBatchReader to a partitioned table is not 
yet supported. "
+                "Materialise the reader as a pa.Table first, or follow "
+                "https://github.com/apache/iceberg-python/issues/2152 for 
partitioned streaming support."
+            )
+        yield from write_file(
+            io=io,
+            table_metadata=table_metadata,
+            tasks=(
+                WriteTask(write_uuid=write_uuid, task_id=next(counter), 
record_batches=batches, schema=task_schema)
+                for batches in bin_pack_record_batches(df, target_file_size)
+            ),
+        )
+        return
+
     if table_metadata.spec().is_unpartitioned():
         yield from write_file(
             io=io,
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index b8d87143c..c5367c867 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -450,12 +450,53 @@ class Transaction:
         """
         return UpdateStatistics(transaction=self)
 
-    def append(self, df: pa.Table, snapshot_properties: dict[str, str] = 
EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
+    def append(
+        self,
+        df: pa.Table | pa.RecordBatchReader,
+        snapshot_properties: dict[str, str] = EMPTY_DICT,
+        branch: str | None = MAIN_BRANCH,
+    ) -> None:
         """
-        Shorthand API for appending a PyArrow table to a table transaction.
+        Shorthand API for appending PyArrow data to a table transaction.
+
+        Accepts either a fully materialised ``pa.Table`` or a streaming
+        ``pa.RecordBatchReader``. Streaming is microbatched by
+        ``write.target-file-size-bytes`` so memory stays bounded; the reader is
+        consumed once and cannot be reused.
+
+        Streaming writes are currently only supported on unpartitioned tables;
+        passing a ``pa.RecordBatchReader`` for a partitioned table raises
+        ``NotImplementedError``. See
+        https://github.com/apache/iceberg-python/issues/2152.
+
+        Note:
+            When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
+            once and cannot be replayed. If the catalog commit fails (e.g.
+            ``CommitFailedException`` from a concurrent writer) the reader is
+            already drained and a naive retry will append zero rows. Callers
+            that need at-least-once semantics should either:
+
+            - reconstruct the reader on each attempt via a factory callable,
+              or
+            - use a two-stage pattern — write Parquet files explicitly and
+              then call :meth:`add_files` (whose input is a replayable list of
+              paths) within a retry loop.
+
+            Failures during the write stage (mid-stream reader exception, S3
+            errors) do not commit a snapshot, but may leave orphan data files
+            in storage that are not referenced by any snapshot. Clean these
+            up with expire/orphan-file maintenance jobs.
+
+            ``write.target-file-size-bytes`` is currently interpreted as
+            uncompressed in-memory Arrow bytes (the bin-packing weight) rather
+            than compressed on-disk Parquet bytes. The resulting files are
+            typically 3-10× smaller than the property suggests after
+            compression. This matches the existing ``pa.Table`` write path and
+            will be tightened once the writer is switched to a
+            rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
 
         Args:
-            df: The Arrow dataframe that will be appended to overwrite the 
table
+            df: An Arrow Table or a RecordBatchReader of records to append.
             snapshot_properties: Custom properties to be added to the snapshot 
summary
             branch: Branch Reference to run the append operation
         """
@@ -466,8 +507,8 @@ class Transaction:
 
         from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, 
_dataframe_to_data_files
 
-        if not isinstance(df, pa.Table):
-            raise ValueError(f"Expected PyArrow table, got: {df}")
+        if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
+            raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: 
{df}")
 
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
@@ -478,12 +519,14 @@ class Transaction:
         )
 
         with self._append_snapshot_producer(snapshot_properties, 
branch=branch) as append_files:
-            # skip writing data files if the dataframe is empty
-            if df.shape[0] > 0:
-                data_files = list(
-                    _dataframe_to_data_files(
-                        table_metadata=self.table_metadata, 
write_uuid=append_files.commit_uuid, df=df, io=self._table.io
-                    )
+            # For pa.Table we can short-circuit empty inputs cheaply. For a
+            # RecordBatchReader the stream is consumed lazily by
+            # _dataframe_to_data_files and an empty reader simply yields zero
+            # data files (the snapshot is still committed for symmetry with the
+            # pa.Table case where empty inputs also produce a snapshot).
+            if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
+                data_files = _dataframe_to_data_files(
+                    table_metadata=self.table_metadata, 
write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                 )
                 for data_file in data_files:
                     append_files.append_data_file(data_file)
@@ -555,14 +598,50 @@ class Transaction:
 
     def overwrite(
         self,
-        df: pa.Table,
+        df: pa.Table | pa.RecordBatchReader,
         overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
         snapshot_properties: dict[str, str] = EMPTY_DICT,
         case_sensitive: bool = True,
         branch: str | None = MAIN_BRANCH,
     ) -> None:
         """
-        Shorthand for adding a table overwrite with a PyArrow table to the 
transaction.
+        Shorthand for adding a table overwrite with a PyArrow table or 
RecordBatchReader to the transaction.
+
+        Accepts either a fully materialised ``pa.Table`` or a streaming
+        ``pa.RecordBatchReader``. Streaming is microbatched by
+        ``write.target-file-size-bytes`` so memory stays bounded; the reader is
+        consumed once and cannot be reused.
+
+        Streaming writes are currently only supported on unpartitioned tables;
+        passing a ``pa.RecordBatchReader`` for a partitioned table raises
+        ``NotImplementedError``. See
+        https://github.com/apache/iceberg-python/issues/2152.
+
+        Note:
+            When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
+            once and cannot be replayed. If the catalog commit fails (e.g.
+            ``CommitFailedException`` from a concurrent writer) the reader is
+            already drained and a naive retry will write zero rows. Callers
+            that need at-least-once semantics should either:
+
+            - reconstruct the reader on each attempt via a factory callable,
+              or
+            - use a two-stage pattern — write Parquet files explicitly and
+              then call :meth:`add_files` (whose input is a replayable list
+              of paths) within a retry loop.
+
+            Failures during the write stage (mid-stream reader exception, S3
+            errors) do not commit a snapshot, but may leave orphan data files
+            in storage that are not referenced by any snapshot. Clean these
+            up with expire/orphan-file maintenance jobs.
+
+            ``write.target-file-size-bytes`` is currently interpreted as
+            uncompressed in-memory Arrow bytes (the bin-packing weight) rather
+            than compressed on-disk Parquet bytes. The resulting files are
+            typically 3-10× smaller than the property suggests after
+            compression. This matches the existing ``pa.Table`` write path and
+            will be tightened once the writer is switched to a
+            rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
 
         An overwrite may produce zero or more snapshots based on the operation:
 
@@ -571,7 +650,7 @@ class Transaction:
             - APPEND: In case new data is being inserted into the table.
 
         Args:
-            df: The Arrow dataframe that will be used to overwrite the table
+            df: An Arrow Table or a RecordBatchReader of records to write.
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial 
overwrite
             snapshot_properties: Custom properties to be added to the snapshot 
summary
@@ -585,8 +664,8 @@ class Transaction:
 
         from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, 
_dataframe_to_data_files
 
-        if not isinstance(df, pa.Table):
-            raise ValueError(f"Expected PyArrow table, got: {df}")
+        if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
+            raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: 
{df}")
 
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
@@ -606,8 +685,8 @@ class Transaction:
             )
 
         with self._append_snapshot_producer(snapshot_properties, 
branch=branch) as append_files:
-            # skip writing data files if the dataframe is empty
-            if df.shape[0] > 0:
+            # See append() for the empty-input handling rationale.
+            if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
                 data_files = _dataframe_to_data_files(
                     table_metadata=self.table_metadata, 
write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                 )
@@ -1373,12 +1452,21 @@ class Table:
                 snapshot_properties=snapshot_properties,
             )
 
-    def append(self, df: pa.Table, snapshot_properties: dict[str, str] = 
EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
+    def append(
+        self,
+        df: pa.Table | pa.RecordBatchReader,
+        snapshot_properties: dict[str, str] = EMPTY_DICT,
+        branch: str | None = MAIN_BRANCH,
+    ) -> None:
         """
-        Shorthand API for appending a PyArrow table to the table.
+        Shorthand API for appending PyArrow data to the table.
+
+        Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
+        See :meth:`Transaction.append` for streaming semantics and partition
+        limitations.
 
         Args:
-            df: The Arrow dataframe that will be appended to overwrite the 
table
+            df: An Arrow Table or a RecordBatchReader of records to append.
             snapshot_properties: Custom properties to be added to the snapshot 
summary
             branch: Branch Reference to run the append operation
         """
@@ -1401,14 +1489,18 @@ class Table:
 
     def overwrite(
         self,
-        df: pa.Table,
+        df: pa.Table | pa.RecordBatchReader,
         overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
         snapshot_properties: dict[str, str] = EMPTY_DICT,
         case_sensitive: bool = True,
         branch: str | None = MAIN_BRANCH,
     ) -> None:
         """
-        Shorthand for overwriting the table with a PyArrow table.
+        Shorthand for overwriting the table with a PyArrow Table or 
RecordBatchReader.
+
+        Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
+        See :meth:`Transaction.overwrite` for streaming semantics and partition
+        limitations.
 
         An overwrite may produce zero or more snapshots based on the operation:
 
@@ -1417,7 +1509,7 @@ class Table:
             - APPEND: In case new data is being inserted into the table.
 
         Args:
-            df: The Arrow dataframe that will be used to overwrite the table
+            df: An Arrow Table or a RecordBatchReader of records to write.
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial 
overwrite
             snapshot_properties: Custom properties to be added to the snapshot 
summary
diff --git a/tests/catalog/test_catalog_behaviors.py 
b/tests/catalog/test_catalog_behaviors.py
index 01e0d2ce3..0a10c556e 100644
--- a/tests/catalog/test_catalog_behaviors.py
+++ b/tests/catalog/test_catalog_behaviors.py
@@ -20,6 +20,7 @@ Consolidated behavior tests for InMemoryCatalog and 
SqlCatalog.
 """
 
 import os
+from collections.abc import Generator
 from pathlib import Path
 from typing import Any
 
@@ -1190,3 +1191,164 @@ def 
test_drop_namespace_raises_error_when_namespace_not_empty(
     catalog.create_table(test_table_identifier, table_schema_nested)
     with pytest.raises(NamespaceNotEmptyError, match=f"Namespace 
{'.'.join(namespace)} is not empty"):
         catalog.drop_namespace(namespace)
+
+
+# RecordBatchReader streaming append/overwrite tests
+#
+# Streaming writes accept a pa.RecordBatchReader and microbatch it into 
target-sized
+# Parquet files instead of materialising the full Arrow Table in memory. Tracks
+# https://github.com/apache/iceberg-python/issues/2152.
+
+
+def _simple_arrow_table() -> pa.Table:
+    return pa.Table.from_pydict(
+        {"foo": ["a", None, "z"]},
+        schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
+    )
+
+
+def _simple_record_batch_reader(num_batches: int = 3) -> 
tuple[pa.RecordBatchReader, int]:
+    """Build an N-batch reader of the simple schema. Returns (reader, 
total_rows)."""
+    pa_table = _simple_arrow_table()
+    batches = pa_table.to_batches() * num_batches
+    reader = pa.RecordBatchReader.from_batches(pa_table.schema, iter(batches))
+    return reader, sum(b.num_rows for b in batches)
+
+
+def test_append_record_batch_reader(catalog: Catalog) -> None:
+    catalog.create_namespace("default")
+    identifier = f"default.append_record_batch_reader_{catalog.name}"
+    reader, total_rows = _simple_record_batch_reader(num_batches=3)
+    tbl = catalog.create_table(identifier=identifier, schema=reader.schema)
+
+    tbl.append(reader)
+
+    assert len(tbl.scan().to_arrow()) == total_rows
+
+
+def test_append_record_batch_reader_microbatched(catalog: Catalog) -> None:
+    """A reader bigger than the per-file target produces multiple Parquet files
+    in a single snapshot — verifying the byte-budget microbatching path."""
+    catalog.create_namespace("default")
+    identifier = 
f"default.append_record_batch_reader_microbatch_{catalog.name}"
+    reader, total_rows = _simple_record_batch_reader(num_batches=8)
+    # Force every batch to roll a new file by setting an absurdly small target 
size.
+    tbl = catalog.create_table(
+        identifier=identifier,
+        schema=reader.schema,
+        properties={TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: "1"},
+    )
+
+    tbl.append(reader)
+
+    snapshot = tbl.metadata.current_snapshot()
+    assert snapshot is not None
+    assert snapshot.summary is not None
+    added_files = snapshot.summary["added-data-files"]
+    assert added_files is not None and int(added_files) > 1, snapshot.summary
+    assert len(tbl.scan().to_arrow()) == total_rows
+
+
+def test_append_record_batch_reader_empty(catalog: Catalog) -> None:
+    catalog.create_namespace("default")
+    identifier = f"default.append_record_batch_reader_empty_{catalog.name}"
+    schema = _simple_arrow_table().schema
+    reader = pa.RecordBatchReader.from_batches(schema, iter([]))
+    tbl = catalog.create_table(identifier=identifier, schema=schema)
+
+    tbl.append(reader)
+
+    assert len(tbl.scan().to_arrow()) == 0
+
+
+def test_overwrite_record_batch_reader(catalog: Catalog) -> None:
+    catalog.create_namespace("default")
+    identifier = f"default.overwrite_record_batch_reader_{catalog.name}"
+    pa_table = _simple_arrow_table()
+    tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema)
+    tbl.append(pa_table)
+    assert len(tbl.scan().to_arrow()) == pa_table.num_rows
+
+    reader, total_rows = _simple_record_batch_reader(num_batches=2)
+    tbl.overwrite(reader)
+
+    assert len(tbl.scan().to_arrow()) == total_rows
+
+
+def test_append_record_batch_reader_to_partitioned_table_raises(catalog: 
Catalog) -> None:
+    catalog.create_namespace("default")
+    identifier = 
f"default.append_record_batch_reader_partitioned_{catalog.name}"
+    iceberg_schema = Schema(
+        NestedField(1, "id", IntegerType(), required=False),
+        NestedField(2, "bucket", StringType(), required=False),
+    )
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=2, field_id=1000, 
transform=IdentityTransform(), name="bucket"),
+    )
+    tbl = catalog.create_table(identifier=identifier, schema=iceberg_schema, 
partition_spec=partition_spec)
+
+    arrow_schema = schema_to_pyarrow(iceberg_schema)
+    reader = pa.RecordBatchReader.from_batches(arrow_schema, iter([]))
+    with pytest.raises(NotImplementedError, match="partitioned table"):
+        tbl.append(reader)
+
+
+def test_append_invalid_input_type_raises(catalog: Catalog) -> None:
+    catalog.create_namespace("default")
+    identifier = f"default.append_invalid_input_{catalog.name}"
+    pa_table = _simple_arrow_table()
+    tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema)
+    with pytest.raises(ValueError, match="Expected pa.Table or 
pa.RecordBatchReader"):
+        tbl.append("not an arrow object")
+
+
+def test_record_batch_reader_consumed_exactly_once(catalog: Catalog) -> None:
+    """The streaming path must consume the underlying generator exactly once.
+    A regression that drained the reader twice (e.g. an extra .schema access
+    that materialised the iterator, or a retry-loop without a fresh reader)
+    would silently lose data — the second pass is empty.
+    """
+    catalog.create_namespace("default")
+    identifier = f"default.record_batch_reader_consumed_once_{catalog.name}"
+    pa_table = _simple_arrow_table()
+    consumed_batches = 0
+
+    def tracking_batches() -> Generator[pa.RecordBatch, None, None]:
+        nonlocal consumed_batches
+        for batch in pa_table.to_batches() * 3:
+            consumed_batches += 1
+            yield batch
+
+    reader = pa.RecordBatchReader.from_batches(pa_table.schema, 
tracking_batches())
+    tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema)
+
+    tbl.append(reader)
+
+    # The generator should have been driven to exhaustion exactly once: 3 
batches.
+    assert consumed_batches == 3
+    assert len(tbl.scan().to_arrow()) == pa_table.num_rows * 3
+
+
+def test_record_batch_reader_schema_mismatch_writes_no_files(catalog: Catalog) 
-> None:
+    """A schema mismatch must fail before any data files are written. Otherwise
+    we'd leak orphan parquet files in storage (and a partial commit that picks
+    them up later via add_files would be a correctness disaster).
+    """
+    catalog.create_namespace("default")
+    identifier = f"default.record_batch_reader_schema_mismatch_{catalog.name}"
+    iceberg_schema = Schema(NestedField(1, "foo", StringType(), 
required=False))
+    tbl = catalog.create_table(identifier=identifier, schema=iceberg_schema)
+
+    bad_schema = pa.schema([pa.field("foo", pa.int64(), nullable=True)])
+    bad_reader = pa.RecordBatchReader.from_batches(
+        bad_schema,
+        iter([pa.RecordBatch.from_pylist([{"foo": 1}], schema=bad_schema)]),
+    )
+
+    with pytest.raises(ValueError):
+        tbl.append(bad_reader)
+
+    # No snapshot should have been produced: the schema check runs before
+    # _append_snapshot_producer opens.
+    assert tbl.metadata.current_snapshot() is None
+    assert len(tbl.scan().to_arrow()) == 0
diff --git a/tests/integration/test_writes/test_partitioned_writes.py 
b/tests/integration/test_writes/test_partitioned_writes.py
index 2bc498560..1d1488255 100644
--- a/tests/integration/test_writes/test_partitioned_writes.py
+++ b/tests/integration/test_writes/test_partitioned_writes.py
@@ -768,7 +768,7 @@ def test_invalid_arguments(spark: SparkSession, 
session_catalog: Catalog) -> Non
         properties={"format-version": "1"},
     )
 
-    with pytest.raises(ValueError, match="Expected PyArrow table, got: not a 
df"):
+    with pytest.raises(ValueError, match="Expected pa.Table or 
pa.RecordBatchReader, got: not a df"):
         tbl.append("not a df")
 
 
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index b6fc7067f..609c1863b 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -791,10 +791,10 @@ def test_invalid_arguments(spark: SparkSession, 
session_catalog: Catalog, arrow_
     identifier = "default.arrow_data_files"
     tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 
[])
 
-    with pytest.raises(ValueError, match="Expected PyArrow table, got: not a 
df"):
+    with pytest.raises(ValueError, match="Expected pa.Table or 
pa.RecordBatchReader, got: not a df"):
         tbl.overwrite("not a df")
 
-    with pytest.raises(ValueError, match="Expected PyArrow table, got: not a 
df"):
+    with pytest.raises(ValueError, match="Expected pa.Table or 
pa.RecordBatchReader, got: not a df"):
         tbl.append("not a df")
 
 
@@ -2571,3 +2571,87 @@ def test_v3_write_and_read_row_lineage(spark: 
SparkSession, session_catalog: Cat
     assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
         "Expected next_row_id to be incremented by the number of added rows"
     )
+
+
+# RecordBatchReader streaming append/overwrite — see 
https://github.com/apache/iceberg-python/issues/2152
+#
+# These integration tests prove Spark can read tables written via the new 
streaming
+# path. Equivalent in-process scan coverage lives in 
tests/catalog/test_catalog_behaviors.py
+# but only Spark exercises the manifest stats + Parquet metadata produced by 
the
+# write_file → fast_append pipeline against an external reader.
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_append_record_batch_reader(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = 
f"default.streaming_append_record_batch_reader_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, {"format-version": 
str(format_version)})
+
+    # 4 batches × 3 rows each — exercises the multi-batch streaming path while
+    # keeping the assertion data tractable for Spark.
+    batches = arrow_table_with_null.to_batches() * 4
+    reader = pa.RecordBatchReader.from_batches(arrow_table_with_null.schema, 
iter(batches))
+    expected_rows = sum(b.num_rows for b in batches)
+
+    tbl.append(reader)
+
+    assert len(tbl.scan().to_arrow()) == expected_rows
+    df = spark.table(identifier)
+    assert df.count() == expected_rows
+    # Spot-check that Spark agrees on the schema as written
+    assert sorted(df.columns) == sorted(arrow_table_with_null.column_names)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_overwrite_record_batch_reader(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = 
f"default.streaming_overwrite_record_batch_reader_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, {"format-version": 
str(format_version)}, [arrow_table_with_null])
+    assert len(tbl.scan().to_arrow()) == arrow_table_with_null.num_rows
+
+    batches = arrow_table_with_null.to_batches() * 2
+    reader = pa.RecordBatchReader.from_batches(arrow_table_with_null.schema, 
iter(batches))
+    expected_rows = sum(b.num_rows for b in batches)
+
+    tbl.overwrite(reader)
+
+    # Existing rows replaced, only the streamed rows remain
+    assert len(tbl.scan().to_arrow()) == expected_rows
+    df = spark.table(identifier)
+    assert df.count() == expected_rows
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_append_record_batch_reader_multifile(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    """Forcing a tiny target file size should produce >1 data file in a single
+    snapshot, proving the byte-budget rollover in bin_pack_record_batches fires
+    end-to-end and the resulting files are valid Iceberg data files (Spark 
reads
+    them all)."""
+    identifier = f"default.streaming_append_multifile_v{format_version}"
+    tbl = _create_table(
+        session_catalog,
+        identifier,
+        {"format-version": str(format_version), 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: "1"},
+    )
+
+    batches = arrow_table_with_null.to_batches(max_chunksize=1) * 4
+    reader = pa.RecordBatchReader.from_batches(arrow_table_with_null.schema, 
iter(batches))
+    expected_rows = sum(b.num_rows for b in batches)
+
+    tbl.append(reader)
+
+    snapshot = tbl.metadata.current_snapshot()
+    assert snapshot is not None
+    assert snapshot.summary is not None
+    added_files = snapshot.summary["added-data-files"]
+    assert added_files is not None and int(added_files) > 1, snapshot.summary
+
+    df = spark.table(identifier)
+    assert df.count() == expected_rows
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 2170741bd..a05b295fc 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -20,6 +20,7 @@ import os
 import tempfile
 import uuid
 import warnings
+from collections.abc import Iterator
 from datetime import date, datetime, timezone
 from pathlib import Path
 from typing import Any
@@ -76,6 +77,7 @@ from pyiceberg.io.pyarrow import (
     _task_to_record_batches,
     _to_requested_schema,
     bin_pack_arrow_table,
+    bin_pack_record_batches,
     compute_statistics_plan,
     data_file_statistics_from_parquet_metadata,
     expression_to_pyarrow,
@@ -2364,6 +2366,50 @@ def 
test_bin_pack_arrow_table_target_size_smaller_than_row(arrow_table_with_null
     assert sum(batch.num_rows for bin_ in bin_packed for batch in bin_) == 
arrow_table_with_null.num_rows
 
 
+def test_bin_pack_record_batches_single_bin(arrow_table_with_null: pa.Table) 
-> None:
+    batches = arrow_table_with_null.to_batches()
+    bins = list(bin_pack_record_batches(iter(batches), 
target_file_size=arrow_table_with_null.nbytes * 10))
+    # everything fits in one bin
+    assert len(bins) == 1
+    assert sum(b.num_rows for b in bins[0]) == arrow_table_with_null.num_rows
+
+
+def test_bin_pack_record_batches_microbatched(arrow_table_with_null: pa.Table) 
-> None:
+    # repeat the per-row batches so we have many small inputs to pack
+    batches = list(arrow_table_with_null.to_batches(max_chunksize=1)) * 5
+    bin_size = arrow_table_with_null.nbytes // 2  # forces multiple bins
+    bins = list(bin_pack_record_batches(iter(batches), 
target_file_size=bin_size))
+    assert len(bins) > 1
+    assert sum(b.num_rows for bin_ in bins for b in bin_) == 
arrow_table_with_null.num_rows * 5
+    # All but the last bin should have crossed the size threshold.
+    for bin_ in bins[:-1]:
+        assert sum(b.nbytes for b in bin_) >= bin_size
+
+
+def test_bin_pack_record_batches_empty() -> None:
+    assert list(bin_pack_record_batches(iter([]), target_file_size=1024)) == []
+
+
+def test_bin_pack_record_batches_is_lazy(arrow_table_with_null: pa.Table) -> 
None:
+    # Streams are single-pass: confirm the helper consumes its input 
batch-by-batch
+    # rather than materialising the whole iterator before yielding the first 
bin.
+    consumed: list[int] = []
+
+    def tracking_iter() -> Iterator[pa.RecordBatch]:
+        for i, batch in 
enumerate(arrow_table_with_null.to_batches(max_chunksize=1)):
+            consumed.append(i)
+            yield batch
+
+    target = max(1, arrow_table_with_null.nbytes // 4)
+    bins_iter = bin_pack_record_batches(tracking_iter(), 
target_file_size=target)
+    first_bin = next(bins_iter)
+    assert len(first_bin) >= 1
+    # Generator should not have walked the entire input upon yielding the 
first bin
+    assert len(consumed) < arrow_table_with_null.num_rows
+    list(bins_iter)
+    assert len(consumed) == arrow_table_with_null.num_rows
+
+
 def test_schema_mismatch_type(table_schema_simple: Schema) -> None:
     other_schema = pa.schema(
         (


Reply via email to