This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new d33939100 perf(add_files): stream manifest entries for duplicate-files
check (#3287)
d33939100 is described below
commit d3393910065a282e96f9b471d0a9b1047ef28b7a
Author: Ben Lai <[email protected]>
AuthorDate: Tue May 19 22:31:13 2026 +0800
perf(add_files): stream manifest entries for duplicate-files check (#3287)
Fixes #3286.
## The hot path today
```python
if check_duplicate_files:
import pyarrow.compute as pc
expr = pc.field("file_path").isin(file_paths)
referenced_files = [file["file_path"] for file in
self._table.inspect.data_files().filter(expr).to_pylist()]
```
What this actually does, per call:
1. `inspect.data_files()` — for every manifest in the current snapshot,
calls `_get_files_from_manifest` (`pyiceberg/table/inspect.py:548`),
which for each `ManifestEntry` builds a Python dict with ~17 fields. The
expensive ones:
- `readable_metrics` — for **every column** in the table schema, decodes
lower/upper bound bytes via `from_bytes` and packs the result into a
per-column dict. This is the single biggest cost on wide tables.
- `partition` — decodes the partition struct into a name → value dict.
- `column_sizes`, `value_counts`, `null_value_counts`,
`nan_value_counts`, `lower_bounds`, `upper_bounds` — each materialized
as a Python dict per file.
2. The dicts are batched into a `pyarrow.Table` per manifest.
3. `pa.concat_tables` glues all manifests' Tables together.
4. `.filter(expr)` applies an Arrow-compute `isin` over the concatenated
Table.
5. `.to_pylist()` converts back to Python dicts.
6. The list comprehension throws away 16 of the 17 columns and keeps
only `file_path`.
For a backfill that calls `add_files` once per day on a growing table,
per-call cost is O(snapshot file count); cumulative cost is O(N²). After
~15 daily commits on a wide-schema table, dup-check time dominates: each
call takes ~10–15 minutes vs seconds early on.
The workaround in #2132 / docs PR #2249 is
`check_duplicate_files=False`, which trades away the idempotency
guarantee that re-running a partial-failure resume is safe.
## Benchmark — before vs after
`tests/benchmark/bench_add_files_dup_check.py` (added in this PR) runs
10 sequential `add_files(check_duplicate_files=True)` calls on an
`InMemoryCatalog` table with a 30-column schema, 200 small parquet files
per call. Measures wall-clock and `tracemalloc` peak per call. Run on
macOS arm64 / Python 3.11.
**Before (upstream `main`):**
```
batch wall_s tracemalloc_peak_MB cumulative_files
0 1.05 5.5 200
1 1.00 9.2 400
2 1.06 12.7 600
3 1.13 18.5 800
4 1.18 20.2 1000
5 1.26 23.4 1200
6 1.32 30.2 1400
7 1.39 34.0 1600
8 1.46 29.9 1800
9 1.51 39.8 2000
```
Wall climbs ~44%; tracemalloc peak grows ~7.2×.
**After (this PR):**
```
batch wall_s tracemalloc_peak_MB cumulative_files
0 1.05 5.5 200
1 1.56 5.6 400
2 0.96 6.2 600
3 0.97 6.3 800
4 0.98 6.5 1000
5 1.00 6.8 1200
6 1.00 6.6 1400
7 1.03 8.2 1600
8 1.04 7.2 1800
9 1.07 6.9 2000
```
Wall flat at ~1s; tracemalloc peak flat at ~6–8 MB. The growth
disappears because the dup-check no longer materializes per-file dicts /
pyarrow Tables / readable_metrics — it just does set containment on
`file_path` while streaming manifest entries.
This is a 10-batch run on a small, narrow workload. Real backfills with
wider schemas (more columns × more row groups), more files per batch,
and many more batches see the constant factor amplify; the production
workload that motivated this PR was hitting ~10–15 minutes per call
after 15 commits.
## What this PR does
Replace the materialize-then-filter with a streaming scan that reuses
the existing `_open_manifest` helper
(`pyiceberg/table/__init__.py:1918`) — the canonical "open a manifest,
fetch entries with `discard_deleted=True`, apply data-file predicates"
pattern already used by `DataScan.scan_plan_helper` (line 2050). Delete
manifests are skipped at the top level (same shape as
`_min_sequence_number`).
The loop body becomes a `set` containment check on
`data_file.file_path`, scheduled via `executor.map` and flattened with
`chain.from_iterable` — same idiom as the existing scan path.
The same approach Spark's `add_files` action takes: predicate-based
against the new paths only, no pre-scan of all data files.
## What this is and isn't
- **Is**: a constant-factor reduction. The Avro decode of manifest
entries is unchanged (still happens via `fetch_manifest_entry`), but
everything downstream of the read — `readable_metrics` computation,
partition decode, per-file dict construction, pyarrow Table
construction, `concat_tables`, `filter`, `to_pylist` — is gone. That
post-processing was the bulk of the time, not the Avro read.
- **Isn't**: an asymptotic fix. Per-call cost is still O(snapshot file
count) for the manifest entry reads; cumulative backfill cost is still
O(N²). Truly eliminating the linear scan would need `file_path`
lower/upper bounds at the `ManifestFile` level so most manifests can be
pruned without opening — that's a spec extension and a follow-up.
## Compatibility / behavior preservation
Audited the change for any behavioral divergence from the old
`inspect.data_files().filter(...)` path:
- **Public API**: `add_files` signature and exception message unchanged.
Existing integration tests at
`tests/integration/test_add_files.py:test_add_files_that_referenced_by_current_snapshot{,_with_check_duplicate_files_true,_with_check_duplicate_files_false}`
exercise the dup-check contract and assert the exact error string — both
preserved verbatim.
- **Callers**: only `Table.add_files`
(`pyiceberg/table/__init__.py:1491`). No subclass overrides exist (e.g.
`CreateTableTransaction` doesn't redefine it).
`Transaction.upsert`/`append`/`overwrite`, `_FastAppendFiles`,
`MergingSnapshotProducer` don't share the dup-check path.
- **File set scanned**: `inspect.data_files()` filtered per-entry on
`DataFileContent.DATA`; new code filters at `ManifestContent.DATA`.
These are theoretically distinct but produce identical sets per the
Iceberg spec — delete entries cannot live in DATA manifests.
- **`discard_deleted`**: both paths use `True` (`fetch_manifest_entry`
defaults to `True`; `_open_manifest` passes it explicitly).
- **Snapshot scope**: both paths use `current_snapshot()` —
`inspect.data_files()` via `_get_snapshot(None)`, new code directly via
`self.table_metadata.current_snapshot()`.
- **Empty `file_paths`**: same result (empty list) and same exceptions
either way. Slight efficiency regression in this edge case — the new
code still walks data manifests where the old code short-circuited via
`pc.field("file_path").isin([])`. Not user-visible; can be optimized in
a follow-up if anyone cares.
- **Side effects**: both paths are read-only; no manifest cache state
mutation, no transaction state changes.
- **Concurrency**: both submit to the shared
`ExecutorFactory.get_or_create()` thread pool.
- **Branch parameter**: `add_files` accepts a `branch` argument, but the
dup-check has always run against `current_snapshot()` (i.e. main)
regardless. This is a **pre-existing inconsistency**, not introduced by
this PR. Preserved exactly to keep this change behavior-preserving.
## Refs
- Issue: #3286
- Related: #2132 (closed as docs), #2133 (parallelization)
---
pyiceberg/table/__init__.py | 28 ++++-
.../test_add_files_dup_check_benchmark.py | 132 +++++++++++++++++++++
2 files changed, 155 insertions(+), 5 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c5367c867..64ad10050 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -940,6 +940,28 @@ class Transaction:
return UpsertResult(rows_updated=update_row_cnt,
rows_inserted=insert_row_cnt)
+ def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]:
+ """Return file_paths already referenced by data files in the current
snapshot."""
+ snapshot = self.table_metadata.current_snapshot()
+ if snapshot is None:
+ return []
+
+ candidates = set(file_paths)
+ io = self._table.io
+ data_manifests = [m for m in snapshot.manifests(io) if m.content ==
ManifestContent.DATA]
+
+ def path_filter(data_file: DataFile) -> bool:
+ return data_file.file_path in candidates
+
+ executor = ExecutorFactory.get_or_create()
+ entries = chain.from_iterable(
+ executor.map(
+ lambda args: _open_manifest(*args),
+ [(io, manifest, path_filter, lambda _: True) for manifest in
data_manifests],
+ )
+ )
+ return [entry.data_file.file_path for entry in entries]
+
def add_files(
self,
file_paths: list[str],
@@ -962,11 +984,7 @@ class Transaction:
raise ValueError("File paths must be unique")
if check_duplicate_files:
- import pyarrow.compute as pc
-
- expr = pc.field("file_path").isin(file_paths)
- referenced_files = [file["file_path"] for file in
self._table.inspect.data_files().filter(expr).to_pylist()]
-
+ referenced_files = self._find_referenced_data_files(file_paths)
if referenced_files:
raise ValueError(f"Cannot add files that are already
referenced by table, files: {', '.join(referenced_files)}")
diff --git a/tests/benchmark/test_add_files_dup_check_benchmark.py
b/tests/benchmark/test_add_files_dup_check_benchmark.py
new file mode 100644
index 000000000..731305bf0
--- /dev/null
+++ b/tests/benchmark/test_add_files_dup_check_benchmark.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmark for `add_files(check_duplicate_files=True)`.
+
+Reproduces the per-call cost of the duplicate-files check on a growing
+table. Before fix: each call materializes every DataFile in the snapshot
+into a pyarrow Table (with readable_metrics, partition decode, full stats
+dicts) and post-filters on file_path — peak memory grows roughly linearly
+with cumulative file count, dominated by per-column stats decoding.
+After fix: streaming manifest scan with set containment on file_path,
+peak memory stays flat.
+
+Run with: uv run pytest tests/benchmark/test_add_files_dup_check_benchmark.py
-v -s -m benchmark
+"""
+
+from __future__ import annotations
+
+import gc
+import tempfile
+import tracemalloc
+from pathlib import Path
+from typing import Any
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType, NestedField, StringType
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) ->
InMemoryCatalog:
+ warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+ catalog = InMemoryCatalog("memory_test",
warehouse=f"file://{warehouse_path}")
+ catalog.create_namespace("default")
+ return catalog
+
+
+def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]:
+ """Build a wide-ish schema so per-column stats decoding has work to do."""
+ iceberg_fields = [NestedField(field_id=1, name="id",
field_type=IntegerType(), required=True)]
+ for i in range(2, num_columns + 1):
+ iceberg_fields.append(NestedField(field_id=i, name=f"col_{i}",
field_type=StringType(), required=False))
+ iceberg_schema = Schema(*iceberg_fields)
+ arrow_schema = pa.schema(
+ [pa.field("id", pa.int32(), nullable=False)]
+ + [pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2,
num_columns + 1)]
+ )
+ return iceberg_schema, arrow_schema
+
+
+def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema:
pa.Schema) -> list[str]:
+ paths: list[str] = []
+ columns: dict[str, list[Any]] = {
+ name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j
in range(8)] for name in arrow_schema.names
+ }
+ rows = pa.Table.from_pydict(columns, schema=arrow_schema)
+ for i in range(n_files):
+ p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet"
+ pq.write_table(rows, p)
+ paths.append(f"file://{p}")
+ return paths
+
+
[email protected]
+def test_add_files_dup_check_memory_growth(memory_catalog: InMemoryCatalog) ->
None:
+ """Peak memory per `add_files(check_duplicate_files=True)` call should stay
+ flat across consecutive calls on a growing table.
+
+ With the materialize-then-filter implementation, peak grows roughly
linearly
+ with cumulative file count (per-column stats decoding into a pyarrow
Table).
+ With the streaming-scan implementation, peak stays bounded by the per-call
+ workload.
+ """
+ num_batches = 10
+ files_per_batch = 200
+ iceberg_schema, arrow_schema = _wide_schema(num_columns=30)
+
+ with tempfile.TemporaryDirectory() as tmp_root:
+ data_dir = Path(tmp_root) / "data"
+ data_dir.mkdir()
+ table = memory_catalog.create_table("default.add_files_bench",
schema=iceberg_schema)
+
+ gc.collect()
+ tracemalloc.start()
+
+ peaks_mb: list[float] = []
+ print(f"\n--- add_files dup-check benchmark ({num_batches} batches ×
{files_per_batch} files, 30 cols) ---")
+ print(f"{'batch':>5} {'tracemalloc_peak_MB':>22}
{'cumulative_files':>17}")
+
+ cumulative = 0
+ for b in range(num_batches):
+ paths = _write_files(data_dir, b, files_per_batch, arrow_schema)
+ tracemalloc.reset_peak()
+ table.add_files(file_paths=paths, check_duplicate_files=True)
+ _, peak = tracemalloc.get_traced_memory()
+ peak_mb = peak / (1024 * 1024)
+ peaks_mb.append(peak_mb)
+ cumulative += files_per_batch
+ print(f"{b:>5d} {peak_mb:>22.1f} {cumulative:>17d}")
+
+ tracemalloc.stop()
+
+ # Growth ratio: last call peak vs first call peak.
+ # Materialize-then-filter (pre-fix): observed ~7× on this workload.
+ # Streaming scan (post-fix): observed ~1×–1.5× (mostly noise).
+ # Threshold of 3× catches the regression while tolerating variance.
+ first_peak = peaks_mb[0]
+ last_peak = peaks_mb[-1]
+ ratio = last_peak / first_peak if first_peak > 0 else float("inf")
+ print(f"\n Peak ratio (last / first): {ratio:.1f}×")
+ max_ratio = 3.0
+ assert ratio < max_ratio, (
+ f"Peak memory ratio ({ratio:.1f}×) exceeds {max_ratio}×. "
+ "Dup-check materializes the full snapshot rather than streaming on
file_path."
+ )