This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d33939100 perf(add_files): stream manifest entries for duplicate-files 
check (#3287)
d33939100 is described below

commit d3393910065a282e96f9b471d0a9b1047ef28b7a
Author: Ben Lai <[email protected]>
AuthorDate: Tue May 19 22:31:13 2026 +0800

    perf(add_files): stream manifest entries for duplicate-files check (#3287)
    
    Fixes #3286.
    
    ## The hot path today
    
    ```python
    if check_duplicate_files:
        import pyarrow.compute as pc
    
        expr = pc.field("file_path").isin(file_paths)
        referenced_files = [file["file_path"] for file in 
self._table.inspect.data_files().filter(expr).to_pylist()]
    ```
    
    What this actually does, per call:
    
    1. `inspect.data_files()` — for every manifest in the current snapshot,
    calls `_get_files_from_manifest` (`pyiceberg/table/inspect.py:548`),
    which for each `ManifestEntry` builds a Python dict with ~17 fields. The
    expensive ones:
    - `readable_metrics` — for **every column** in the table schema, decodes
    lower/upper bound bytes via `from_bytes` and packs the result into a
    per-column dict. This is the single biggest cost on wide tables.
    - `partition` — decodes the partition struct into a name → value dict.
    - `column_sizes`, `value_counts`, `null_value_counts`,
    `nan_value_counts`, `lower_bounds`, `upper_bounds` — each materialized
    as a Python dict per file.
    2. The dicts are batched into a `pyarrow.Table` per manifest.
    3. `pa.concat_tables` glues all manifests' Tables together.
    4. `.filter(expr)` applies an Arrow-compute `isin` over the concatenated
    Table.
    5. `.to_pylist()` converts back to Python dicts.
    6. The list comprehension throws away 16 of the 17 columns and keeps
    only `file_path`.
    
    For a backfill that calls `add_files` once per day on a growing table,
    per-call cost is O(snapshot file count); cumulative cost is O(N²). After
    ~15 daily commits on a wide-schema table, dup-check time dominates: each
    call takes ~10–15 minutes vs seconds early on.
    
    The workaround in #2132 / docs PR #2249 is
    `check_duplicate_files=False`, which trades away the idempotency
    guarantee that re-running a partial-failure resume is safe.
    
    ## Benchmark — before vs after
    
    `tests/benchmark/bench_add_files_dup_check.py` (added in this PR) runs
    10 sequential `add_files(check_duplicate_files=True)` calls on an
    `InMemoryCatalog` table with a 30-column schema, 200 small parquet files
    per call. Measures wall-clock and `tracemalloc` peak per call. Run on
    macOS arm64 / Python 3.11.
    
    **Before (upstream `main`):**
    ```
    batch   wall_s    tracemalloc_peak_MB  cumulative_files
        0     1.05                    5.5               200
        1     1.00                    9.2               400
        2     1.06                   12.7               600
        3     1.13                   18.5               800
        4     1.18                   20.2              1000
        5     1.26                   23.4              1200
        6     1.32                   30.2              1400
        7     1.39                   34.0              1600
        8     1.46                   29.9              1800
        9     1.51                   39.8              2000
    ```
    
    Wall climbs ~44%; tracemalloc peak grows ~7.2×.
    
    **After (this PR):**
    ```
    batch   wall_s    tracemalloc_peak_MB  cumulative_files
        0     1.05                    5.5               200
        1     1.56                    5.6               400
        2     0.96                    6.2               600
        3     0.97                    6.3               800
        4     0.98                    6.5              1000
        5     1.00                    6.8              1200
        6     1.00                    6.6              1400
        7     1.03                    8.2              1600
        8     1.04                    7.2              1800
        9     1.07                    6.9              2000
    ```
    
    Wall flat at ~1s; tracemalloc peak flat at ~6–8 MB. The growth
    disappears because the dup-check no longer materializes per-file dicts /
    pyarrow Tables / readable_metrics — it just does set containment on
    `file_path` while streaming manifest entries.
    
    This is a 10-batch run on a small, narrow workload. Real backfills with
    wider schemas (more columns × more row groups), more files per batch,
    and many more batches see the constant factor amplify; the production
    workload that motivated this PR was hitting ~10–15 minutes per call
    after 15 commits.
    
    ## What this PR does
    
    Replace the materialize-then-filter with a streaming scan that reuses
    the existing `_open_manifest` helper
    (`pyiceberg/table/__init__.py:1918`) — the canonical "open a manifest,
    fetch entries with `discard_deleted=True`, apply data-file predicates"
    pattern already used by `DataScan.scan_plan_helper` (line 2050). Delete
    manifests are skipped at the top level (same shape as
    `_min_sequence_number`).
    
    The loop body becomes a `set` containment check on
    `data_file.file_path`, scheduled via `executor.map` and flattened with
    `chain.from_iterable` — same idiom as the existing scan path.
    
    The same approach Spark's `add_files` action takes: predicate-based
    against the new paths only, no pre-scan of all data files.
    
    ## What this is and isn't
    
    - **Is**: a constant-factor reduction. The Avro decode of manifest
    entries is unchanged (still happens via `fetch_manifest_entry`), but
    everything downstream of the read — `readable_metrics` computation,
    partition decode, per-file dict construction, pyarrow Table
    construction, `concat_tables`, `filter`, `to_pylist` — is gone. That
    post-processing was the bulk of the time, not the Avro read.
    - **Isn't**: an asymptotic fix. Per-call cost is still O(snapshot file
    count) for the manifest entry reads; cumulative backfill cost is still
    O(N²). Truly eliminating the linear scan would need `file_path`
    lower/upper bounds at the `ManifestFile` level so most manifests can be
    pruned without opening — that's a spec extension and a follow-up.
    
    ## Compatibility / behavior preservation
    
    Audited the change for any behavioral divergence from the old
    `inspect.data_files().filter(...)` path:
    
    - **Public API**: `add_files` signature and exception message unchanged.
    Existing integration tests at
    
`tests/integration/test_add_files.py:test_add_files_that_referenced_by_current_snapshot{,_with_check_duplicate_files_true,_with_check_duplicate_files_false}`
    exercise the dup-check contract and assert the exact error string — both
    preserved verbatim.
    - **Callers**: only `Table.add_files`
    (`pyiceberg/table/__init__.py:1491`). No subclass overrides exist (e.g.
    `CreateTableTransaction` doesn't redefine it).
    `Transaction.upsert`/`append`/`overwrite`, `_FastAppendFiles`,
    `MergingSnapshotProducer` don't share the dup-check path.
    - **File set scanned**: `inspect.data_files()` filtered per-entry on
    `DataFileContent.DATA`; new code filters at `ManifestContent.DATA`.
    These are theoretically distinct but produce identical sets per the
    Iceberg spec — delete entries cannot live in DATA manifests.
    - **`discard_deleted`**: both paths use `True` (`fetch_manifest_entry`
    defaults to `True`; `_open_manifest` passes it explicitly).
    - **Snapshot scope**: both paths use `current_snapshot()` —
    `inspect.data_files()` via `_get_snapshot(None)`, new code directly via
    `self.table_metadata.current_snapshot()`.
    - **Empty `file_paths`**: same result (empty list) and same exceptions
    either way. Slight efficiency regression in this edge case — the new
    code still walks data manifests where the old code short-circuited via
    `pc.field("file_path").isin([])`. Not user-visible; can be optimized in
    a follow-up if anyone cares.
    - **Side effects**: both paths are read-only; no manifest cache state
    mutation, no transaction state changes.
    - **Concurrency**: both submit to the shared
    `ExecutorFactory.get_or_create()` thread pool.
    - **Branch parameter**: `add_files` accepts a `branch` argument, but the
    dup-check has always run against `current_snapshot()` (i.e. main)
    regardless. This is a **pre-existing inconsistency**, not introduced by
    this PR. Preserved exactly to keep this change behavior-preserving.
    
    ## Refs
    
    - Issue: #3286
    - Related: #2132 (closed as docs), #2133 (parallelization)
---
 pyiceberg/table/__init__.py                        |  28 ++++-
 .../test_add_files_dup_check_benchmark.py          | 132 +++++++++++++++++++++
 2 files changed, 155 insertions(+), 5 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c5367c867..64ad10050 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -940,6 +940,28 @@ class Transaction:
 
         return UpsertResult(rows_updated=update_row_cnt, 
rows_inserted=insert_row_cnt)
 
+    def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]:
+        """Return file_paths already referenced by data files in the current 
snapshot."""
+        snapshot = self.table_metadata.current_snapshot()
+        if snapshot is None:
+            return []
+
+        candidates = set(file_paths)
+        io = self._table.io
+        data_manifests = [m for m in snapshot.manifests(io) if m.content == 
ManifestContent.DATA]
+
+        def path_filter(data_file: DataFile) -> bool:
+            return data_file.file_path in candidates
+
+        executor = ExecutorFactory.get_or_create()
+        entries = chain.from_iterable(
+            executor.map(
+                lambda args: _open_manifest(*args),
+                [(io, manifest, path_filter, lambda _: True) for manifest in 
data_manifests],
+            )
+        )
+        return [entry.data_file.file_path for entry in entries]
+
     def add_files(
         self,
         file_paths: list[str],
@@ -962,11 +984,7 @@ class Transaction:
             raise ValueError("File paths must be unique")
 
         if check_duplicate_files:
-            import pyarrow.compute as pc
-
-            expr = pc.field("file_path").isin(file_paths)
-            referenced_files = [file["file_path"] for file in 
self._table.inspect.data_files().filter(expr).to_pylist()]
-
+            referenced_files = self._find_referenced_data_files(file_paths)
             if referenced_files:
                 raise ValueError(f"Cannot add files that are already 
referenced by table, files: {', '.join(referenced_files)}")
 
diff --git a/tests/benchmark/test_add_files_dup_check_benchmark.py 
b/tests/benchmark/test_add_files_dup_check_benchmark.py
new file mode 100644
index 000000000..731305bf0
--- /dev/null
+++ b/tests/benchmark/test_add_files_dup_check_benchmark.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmark for `add_files(check_duplicate_files=True)`.
+
+Reproduces the per-call cost of the duplicate-files check on a growing
+table. Before fix: each call materializes every DataFile in the snapshot
+into a pyarrow Table (with readable_metrics, partition decode, full stats
+dicts) and post-filters on file_path — peak memory grows roughly linearly
+with cumulative file count, dominated by per-column stats decoding.
+After fix: streaming manifest scan with set containment on file_path,
+peak memory stays flat.
+
+Run with: uv run pytest tests/benchmark/test_add_files_dup_check_benchmark.py 
-v -s -m benchmark
+"""
+
+from __future__ import annotations
+
+import gc
+import tempfile
+import tracemalloc
+from pathlib import Path
+from typing import Any
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType, NestedField, StringType
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> 
InMemoryCatalog:
+    warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+    catalog = InMemoryCatalog("memory_test", 
warehouse=f"file://{warehouse_path}")
+    catalog.create_namespace("default")
+    return catalog
+
+
+def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]:
+    """Build a wide-ish schema so per-column stats decoding has work to do."""
+    iceberg_fields = [NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True)]
+    for i in range(2, num_columns + 1):
+        iceberg_fields.append(NestedField(field_id=i, name=f"col_{i}", 
field_type=StringType(), required=False))
+    iceberg_schema = Schema(*iceberg_fields)
+    arrow_schema = pa.schema(
+        [pa.field("id", pa.int32(), nullable=False)]
+        + [pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, 
num_columns + 1)]
+    )
+    return iceberg_schema, arrow_schema
+
+
+def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: 
pa.Schema) -> list[str]:
+    paths: list[str] = []
+    columns: dict[str, list[Any]] = {
+        name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j 
in range(8)] for name in arrow_schema.names
+    }
+    rows = pa.Table.from_pydict(columns, schema=arrow_schema)
+    for i in range(n_files):
+        p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet"
+        pq.write_table(rows, p)
+        paths.append(f"file://{p}")
+    return paths
+
+
[email protected]
+def test_add_files_dup_check_memory_growth(memory_catalog: InMemoryCatalog) -> 
None:
+    """Peak memory per `add_files(check_duplicate_files=True)` call should stay
+    flat across consecutive calls on a growing table.
+
+    With the materialize-then-filter implementation, peak grows roughly 
linearly
+    with cumulative file count (per-column stats decoding into a pyarrow 
Table).
+    With the streaming-scan implementation, peak stays bounded by the per-call
+    workload.
+    """
+    num_batches = 10
+    files_per_batch = 200
+    iceberg_schema, arrow_schema = _wide_schema(num_columns=30)
+
+    with tempfile.TemporaryDirectory() as tmp_root:
+        data_dir = Path(tmp_root) / "data"
+        data_dir.mkdir()
+        table = memory_catalog.create_table("default.add_files_bench", 
schema=iceberg_schema)
+
+        gc.collect()
+        tracemalloc.start()
+
+        peaks_mb: list[float] = []
+        print(f"\n--- add_files dup-check benchmark ({num_batches} batches × 
{files_per_batch} files, 30 cols) ---")
+        print(f"{'batch':>5} {'tracemalloc_peak_MB':>22} 
{'cumulative_files':>17}")
+
+        cumulative = 0
+        for b in range(num_batches):
+            paths = _write_files(data_dir, b, files_per_batch, arrow_schema)
+            tracemalloc.reset_peak()
+            table.add_files(file_paths=paths, check_duplicate_files=True)
+            _, peak = tracemalloc.get_traced_memory()
+            peak_mb = peak / (1024 * 1024)
+            peaks_mb.append(peak_mb)
+            cumulative += files_per_batch
+            print(f"{b:>5d} {peak_mb:>22.1f} {cumulative:>17d}")
+
+        tracemalloc.stop()
+
+        # Growth ratio: last call peak vs first call peak.
+        # Materialize-then-filter (pre-fix): observed ~7× on this workload.
+        # Streaming scan (post-fix): observed ~1×–1.5× (mostly noise).
+        # Threshold of 3× catches the regression while tolerating variance.
+        first_peak = peaks_mb[0]
+        last_peak = peaks_mb[-1]
+        ratio = last_peak / first_peak if first_peak > 0 else float("inf")
+        print(f"\n  Peak ratio (last / first): {ratio:.1f}×")
+        max_ratio = 3.0
+        assert ratio < max_ratio, (
+            f"Peak memory ratio ({ratio:.1f}×) exceeds {max_ratio}×. "
+            "Dup-check materializes the full snapshot rather than streaming on 
file_path."
+        )

Reply via email to