This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 326884cb fix manifest cache (#2951)
326884cb is described below

commit 326884cb617a4b923101192b13f7c12ade9f757e
Author: Kevin Liu <[email protected]>
AuthorDate: Wed Jan 28 20:57:14 2026 -0500

    fix manifest cache (#2951)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    
    # Rationale for this change
    Fix part of #2325
    Context:
    https://github.com/apache/iceberg-python/issues/2325#issuecomment-3193881084
    
    Cache Manifest File object instead of Manifest List (tuple of Manifest
    Files).
    This PR fix the O(N²) cache inefficiency, into the expected O(N) linear
    growth pattern.
    
    ## Are these changes tested?
    Yes, with benchmark test (`tests/benchmark/test_memory_benchmark.py`)
    Result running from main branch:
    https://gist.github.com/kevinjqliu/970f4b51a12aaa0318a2671173430736
    Result running from this branch:
    https://gist.github.com/kevinjqliu/24990d18d2cea2fa468597c16bfa27fd
    
    ### Benchmark Comparison: main vs kevinjqliu/fix-manifest-cache
    | Test | main | fix branch |
    |------|:----:|:----------:|
    | `test_manifest_cache_memory_growth` | ❌ FAILED | ✅ PASSED |
    | `test_memory_after_gc_with_cache_cleared` | ✅ PASSED | ✅ PASSED |
    | `test_manifest_cache_deduplication_efficiency` | ✅ PASSED | ✅ PASSED |
    
    #### Memory Growth Benchmark (50 append operations)
    
    | Metric | main | fix branch | Improvement |
    |--------|-----:|----------:|------------:|
    | Initial memory | 3,233.4 KB | 3,210.7 KB | -0.7% |
    | Final memory | 4,280.6 KB | 3,558.9 KB | **-16.9%** |
    | Total growth | 1,047.2 KB | 348.1 KB | **-66.8%** |
    | Growth per iteration | 26,809 bytes | 8,913 bytes | **-66.8%** |
    
    #### Memory at Each Iteration
    
    | Iteration | main | fix branch | Δ |
    |----------:|-----:|-----------:|--:|
    | 10 | 3,233.4 KB | 3,210.7 KB | -22.7 KB |
    | 20 | 3,471.0 KB | 3,371.4 KB | -99.6 KB |
    | 30 | 3,719.3 KB | 3,467.1 KB | -252.2 KB |
    | 40 | 3,943.9 KB | 3,483.2 KB | -460.7 KB |
    | 50 | 4,280.6 KB | 3,558.9 KB | **-721.7 KB** |
    
    This fix reduces memory growth by ~67%, bringing per-iteration growth
    from ~27 KB down to ~9 KB.
    
    The improvement comes from caching individual `ManifestFile` objects by
    their `manifest_path` instead of caching entire manifest list tuples.
    This deduplicates `ManifestFile` objects that appear in multiple
    manifest lists (common after appends).
    
    ## Are there any user-facing changes?
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 pyiceberg/manifest.py                    |  51 +++++-
 tests/benchmark/test_memory_benchmark.py | 287 +++++++++++++++++++++++++++++
 tests/utils/test_manifest.py             | 301 +++++++++++++++++++++++++++++--
 3 files changed, 614 insertions(+), 25 deletions(-)

diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index bd83075b..4c68f5e3 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -28,8 +28,7 @@ from typing import (
     Literal,
 )
 
-from cachetools import LRUCache, cached
-from cachetools.keys import hashkey
+from cachetools import LRUCache
 from pydantic_core import to_json
 
 from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
@@ -892,15 +891,53 @@ class ManifestFile(Record):
         return hash(self.manifest_path)
 
 
-# Global cache for manifest lists
-_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] = 
LRUCache(maxsize=128)
+# Global cache for ManifestFile objects, keyed by manifest_path.
+# This deduplicates ManifestFile objects across manifest lists, which commonly
+# share manifests after append operations.
+_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
+
+# Lock for thread-safe cache access
+_manifest_cache_lock = threading.RLock()
 
 
-@cached(cache=_manifest_cache, key=lambda io, manifest_list: 
hashkey(manifest_list), lock=threading.RLock())
 def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
-    """Read and cache manifests from the given manifest list, returning a 
tuple to prevent modification."""
+    """Read manifests from a manifest list, deduplicating ManifestFile objects 
via cache.
+
+    Caches individual ManifestFile objects by manifest_path. This is 
memory-efficient
+    because consecutive manifest lists typically share most of their manifests:
+
+        ManifestList1: [ManifestFile1]
+        ManifestList2: [ManifestFile1, ManifestFile2]
+        ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3]
+
+    With per-ManifestFile caching, each ManifestFile is stored once and reused.
+
+    Note: The manifest list file is re-read on each call. This is intentional 
to
+    keep the implementation simple and avoid O(N²) memory growth from caching
+    overlapping manifest list tuples. Re-reading is cheap since manifest lists
+    are small metadata files.
+
+    Args:
+        io: FileIO instance for reading the manifest list.
+        manifest_list: Path to the manifest list file.
+
+    Returns:
+        A tuple of ManifestFile objects.
+    """
     file = io.new_input(manifest_list)
-    return tuple(read_manifest_list(file))
+    manifest_files = list(read_manifest_list(file))
+
+    result = []
+    with _manifest_cache_lock:
+        for manifest_file in manifest_files:
+            manifest_path = manifest_file.manifest_path
+            if manifest_path in _manifest_cache:
+                result.append(_manifest_cache[manifest_path])
+            else:
+                _manifest_cache[manifest_path] = manifest_file
+                result.append(manifest_file)
+
+    return tuple(result)
 
 
 def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
diff --git a/tests/benchmark/test_memory_benchmark.py 
b/tests/benchmark/test_memory_benchmark.py
new file mode 100644
index 00000000..82454c85
--- /dev/null
+++ b/tests/benchmark/test_memory_benchmark.py
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Memory benchmarks for manifest cache efficiency.
+
+These benchmarks reproduce the manifest cache memory issue described in:
+https://github.com/apache/iceberg-python/issues/2325
+
+The issue: When caching manifest lists as tuples, overlapping ManifestFile 
objects
+are duplicated across cache entries, causing O(N²) memory growth instead of 
O(N).
+
+Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m 
benchmark
+"""
+
+import gc
+import tracemalloc
+from datetime import datetime, timezone
+
+import pyarrow as pa
+import pytest
+
+from pyiceberg.catalog.memory import InMemoryCatalog
+from pyiceberg.manifest import _manifest_cache
+
+
+def generate_test_dataframe() -> pa.Table:
+    """Generate a PyArrow table for testing, similar to the issue's example."""
+    n_rows = 100  # Smaller for faster tests, increase for more realistic 
benchmarks
+
+    return pa.table(
+        {
+            "event_type": ["playback"] * n_rows,
+            "event_origin": ["origin1"] * n_rows,
+            "event_send_at": [datetime.now(timezone.utc)] * n_rows,
+            "event_saved_at": [datetime.now(timezone.utc)] * n_rows,
+            "id": list(range(n_rows)),
+            "reference_id": [f"ref-{i}" for i in range(n_rows)],
+        }
+    )
+
+
[email protected]
+def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> 
InMemoryCatalog:
+    """Create an in-memory catalog for memory testing."""
+    warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
+    catalog = InMemoryCatalog("memory_test", 
warehouse=f"file://{warehouse_path}")
+    catalog.create_namespace("default")
+    return catalog
+
+
[email protected](autouse=True)
+def clear_caches() -> None:
+    """Clear caches before each test."""
+    _manifest_cache.clear()
+    gc.collect()
+
+
[email protected]
+def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
+    """Benchmark memory growth of manifest cache during repeated appends.
+
+    This test reproduces the issue from GitHub #2325 where each append creates
+    a new manifest list entry in the cache, causing memory to grow.
+
+    With the old caching strategy (tuple per manifest list), memory grew as 
O(N²).
+    With the new strategy (individual ManifestFile objects), memory grows as 
O(N).
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.memory_test", 
schema=df.schema)
+
+    tracemalloc.start()
+
+    num_iterations = 50
+    memory_samples: list[tuple[int, int, int]] = []  # (iteration, 
current_memory, cache_size)
+
+    print("\n--- Manifest Cache Memory Growth Benchmark ---")
+    print(f"Running {num_iterations} append operations...")
+
+    for i in range(num_iterations):
+        table.append(df)
+
+        # Sample memory at intervals
+        if (i + 1) % 10 == 0:
+            current, _ = tracemalloc.get_traced_memory()
+            cache_size = len(_manifest_cache)
+
+            memory_samples.append((i + 1, current, cache_size))
+            print(f"  Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache 
entries={cache_size}")
+
+    tracemalloc.stop()
+
+    # Analyze memory growth
+    if len(memory_samples) >= 2:
+        first_memory = memory_samples[0][1]
+        last_memory = memory_samples[-1][1]
+        memory_growth = last_memory - first_memory
+        growth_per_iteration = memory_growth / (memory_samples[-1][0] - 
memory_samples[0][0])
+
+        print("\nResults:")
+        print(f"  Initial memory: {first_memory / 1024:.1f} KB")
+        print(f"  Final memory: {last_memory / 1024:.1f} KB")
+        print(f"  Total growth: {memory_growth / 1024:.1f} KB")
+        print(f"  Growth per iteration: {growth_per_iteration:.1f} bytes")
+        print(f"  Final cache size: {memory_samples[-1][2]} entries")
+
+        # With efficient caching, growth should be roughly linear (O(N))
+        # rather than quadratic (O(N²)) as it was before
+        # Memory growth includes ManifestFile objects, metadata, and other 
overhead
+        # We expect about 5-10 KB per iteration for typical workloads
+        # The key improvement is that growth is O(N) not O(N²)
+        # Threshold of 15KB/iteration based on observed behavior - O(N²) would 
show ~50KB+/iteration
+        max_memory_growth_per_iteration_bytes = 15000
+        assert growth_per_iteration < max_memory_growth_per_iteration_bytes, (
+            f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) 
is too high. "
+            "This may indicate the O(N²) cache inefficiency is present."
+        )
+
+
[email protected]
+def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) 
-> None:
+    """Test that clearing the cache allows memory to be reclaimed.
+
+    This test verifies that when we clear the manifest cache, the associated
+    memory can be garbage collected.
+    """
+    df = generate_test_dataframe()
+    table = memory_catalog.create_table("default.gc_test", schema=df.schema)
+
+    tracemalloc.start()
+
+    print("\n--- Memory After GC Benchmark ---")
+
+    # Phase 1: Fill the cache
+    print("Phase 1: Filling cache with 20 appends...")
+    for _ in range(20):
+        table.append(df)
+
+    gc.collect()
+    before_clear_memory, _ = tracemalloc.get_traced_memory()
+    cache_size_before = len(_manifest_cache)
+    print(f"  Memory before clear: {before_clear_memory / 1024:.1f} KB")
+    print(f"  Cache size: {cache_size_before}")
+
+    # Phase 2: Clear cache and GC
+    print("\nPhase 2: Clearing cache and running GC...")
+    _manifest_cache.clear()
+    gc.collect()
+    gc.collect()  # Multiple GC passes for thorough cleanup
+
+    after_clear_memory, _ = tracemalloc.get_traced_memory()
+    print(f"  Memory after clear: {after_clear_memory / 1024:.1f} KB")
+    print(f"  Memory reclaimed: {(before_clear_memory - after_clear_memory) / 
1024:.1f} KB")
+
+    tracemalloc.stop()
+
+    memory_reclaimed = before_clear_memory - after_clear_memory
+    print("\nResults:")
+    print(f"  Memory reclaimed by clearing cache: {memory_reclaimed / 
1024:.1f} KB")
+
+    # Verify that clearing the cache actually freed some memory
+    # Note: This may be flaky in some environments due to GC behavior
+    assert memory_reclaimed >= 0, "Memory should not increase after clearing 
cache"
+
+
[email protected]
+def test_manifest_cache_deduplication_efficiency() -> None:
+    """Benchmark the efficiency of the per-ManifestFile caching strategy.
+
+    This test verifies that when multiple manifest lists share the same
+    ManifestFile objects, they are properly deduplicated in the cache.
+    """
+    from tempfile import TemporaryDirectory
+
+    from pyiceberg.io.pyarrow import PyArrowFileIO
+    from pyiceberg.manifest import (
+        DataFile,
+        DataFileContent,
+        FileFormat,
+        ManifestEntry,
+        ManifestEntryStatus,
+        _manifests,
+        write_manifest,
+        write_manifest_list,
+    )
+    from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
+    from pyiceberg.schema import Schema
+    from pyiceberg.typedef import Record
+    from pyiceberg.types import IntegerType, NestedField
+
+    io = PyArrowFileIO()
+
+    print("\n--- Manifest Cache Deduplication Benchmark ---")
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create N manifest files
+        num_manifests = 20
+        manifest_files = []
+
+        print(f"Creating {num_manifests} manifest files...")
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest_{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="null",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data_{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100,
+                    file_size_in_bytes=1000,
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create multiple manifest lists with overlapping manifest files
+        # List i contains manifest files 0 through i
+        num_lists = 10
+        print(f"Creating {num_lists} manifest lists with overlapping 
manifests...")
+
+        _manifest_cache.clear()
+
+        for i in range(num_lists):
+            list_path = f"{tmp_dir}/manifest-list_{i}.avro"
+            manifests_to_include = manifest_files[: i + 1]
+
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="null",
+            ) as list_writer:
+                list_writer.add_manifests(manifests_to_include)
+
+            # Read the manifest list using _manifests (this populates the 
cache)
+            _manifests(io, list_path)
+
+        # Analyze cache efficiency
+        cache_entries = len(_manifest_cache)
+        # List i contains manifests 0..i, so only the first num_lists 
manifests are actually used
+        manifests_actually_used = num_lists
+
+        print("\nResults:")
+        print(f"  Manifest lists created: {num_lists}")
+        print(f"  Manifest files created: {num_manifests}")
+        print(f"  Manifest files actually used: {manifests_actually_used}")
+        print(f"  Cache entries: {cache_entries}")
+
+        # With efficient per-ManifestFile caching, we should have exactly
+        # manifests_actually_used entries (one per unique manifest path)
+        print(f"\n  Expected cache entries (efficient): 
{manifests_actually_used}")
+        print(f"  Actual cache entries: {cache_entries}")
+
+        # The cache should be efficient - one entry per unique manifest path
+        assert cache_entries == manifests_actually_used, (
+            f"Cache has {cache_entries} entries, expected exactly 
{manifests_actually_used}. "
+            "The cache may not be deduplicating properly."
+        )
diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py
index abd9878e..3862d6b6 100644
--- a/tests/utils/test_manifest.py
+++ b/tests/utils/test_manifest.py
@@ -16,7 +16,6 @@
 # under the License.
 # pylint: disable=redefined-outer-name,arguments-renamed,fixme
 from tempfile import TemporaryDirectory
-from unittest.mock import patch
 
 import fastavro
 import pytest
@@ -29,10 +28,12 @@ from pyiceberg.manifest import (
     DataFileContent,
     FileFormat,
     ManifestContent,
+    ManifestEntry,
     ManifestEntryStatus,
     ManifestFile,
     PartitionFieldSummary,
     _manifest_cache,
+    _manifests,
     read_manifest_list,
     write_manifest,
     write_manifest_list,
@@ -314,27 +315,33 @@ def 
test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
 
 
 def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None:
-    with patch("pyiceberg.manifest.read_manifest_list") as 
mocked_read_manifest_list:
-        io = load_file_io()
+    """Test that ManifestFile objects are cached and reused across multiple 
reads.
 
-        snapshot = Snapshot(
-            snapshot_id=25,
-            parent_snapshot_id=19,
-            timestamp_ms=1602638573590,
-            manifest_list=generated_manifest_file_file_v2,
-            summary=Summary(Operation.APPEND),
-            schema_id=3,
-        )
+    The cache now stores individual ManifestFile objects by their 
manifest_path,
+    rather than caching entire manifest list tuples. This is more 
memory-efficient
+    when multiple manifest lists share overlapping ManifestFile objects.
+    """
+    io = load_file_io()
 
-        # Access the manifests property multiple times to test caching
-        manifests_first_call = snapshot.manifests(io)
-        manifests_second_call = snapshot.manifests(io)
+    snapshot = Snapshot(
+        snapshot_id=25,
+        parent_snapshot_id=19,
+        timestamp_ms=1602638573590,
+        manifest_list=generated_manifest_file_file_v2,
+        summary=Summary(Operation.APPEND),
+        schema_id=3,
+    )
 
-        # Ensure that read_manifest_list was called only once
-        mocked_read_manifest_list.assert_called_once()
+    # Access the manifests property multiple times
+    manifests_first_call = snapshot.manifests(io)
+    manifests_second_call = snapshot.manifests(io)
 
-        # Ensure that the same manifest list is returned
-        assert manifests_first_call == manifests_second_call
+    # Ensure that the same manifest list content is returned
+    assert manifests_first_call == manifests_second_call
+
+    # Verify that ManifestFile objects are the same instances (cached)
+    for mf1, mf2 in zip(manifests_first_call, manifests_second_call, 
strict=True):
+        assert mf1 is mf2, "ManifestFile objects should be the same cached 
instance"
 
 
 def test_write_empty_manifest() -> None:
@@ -632,3 +639,261 @@ def test_file_format_case_insensitive(raw_file_format: 
str, expected_file_format
     else:
         with pytest.raises(ValueError):
             _ = FileFormat(raw_file_format)
+
+
+def test_manifest_cache_deduplicates_manifest_files() -> None:
+    """Test that the manifest cache deduplicates ManifestFile objects across 
manifest lists.
+
+    This test verifies the fix for 
https://github.com/apache/iceberg-python/issues/2325
+
+    The issue was that when caching manifest lists by their path, overlapping 
ManifestFile
+    objects were duplicated. For example:
+    - ManifestList1: (ManifestFile1)
+    - ManifestList2: (ManifestFile1, ManifestFile2)
+    - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
+
+    With the old approach, ManifestFile1 was stored 3 times in the cache.
+    With the new approach, ManifestFile objects are cached individually by 
their
+    manifest_path, so ManifestFile1 is stored only once and reused.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        # Create three manifest files to simulate manifests created during 
appends
+        manifest1_path = f"{tmp_dir}/manifest1.avro"
+        manifest2_path = f"{tmp_dir}/manifest2.avro"
+        manifest3_path = f"{tmp_dir}/manifest3.avro"
+
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        # Create manifest file 1
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest1_path),
+            snapshot_id=1,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file1 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data1.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=100,
+                file_size_in_bytes=1000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=1,
+                    data_file=data_file1,
+                )
+            )
+        manifest_file1 = writer.to_manifest_file()
+
+        # Create manifest file 2
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest2_path),
+            snapshot_id=2,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file2 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data2.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=200,
+                file_size_in_bytes=2000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=2,
+                    data_file=data_file2,
+                )
+            )
+        manifest_file2 = writer.to_manifest_file()
+
+        # Create manifest file 3
+        with write_manifest(
+            format_version=2,
+            spec=spec,
+            schema=schema,
+            output_file=io.new_output(manifest3_path),
+            snapshot_id=3,
+            avro_compression="zstandard",
+        ) as writer:
+            data_file3 = DataFile.from_args(
+                content=DataFileContent.DATA,
+                file_path=f"{tmp_dir}/data3.parquet",
+                file_format=FileFormat.PARQUET,
+                partition=Record(),
+                record_count=300,
+                file_size_in_bytes=3000,
+            )
+            writer.add_entry(
+                ManifestEntry.from_args(
+                    status=ManifestEntryStatus.ADDED,
+                    snapshot_id=3,
+                    data_file=data_file3,
+                )
+            )
+        manifest_file3 = writer.to_manifest_file()
+
+        # Create manifest list 1: contains only manifest1
+        manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list1_path),
+            snapshot_id=1,
+            parent_snapshot_id=None,
+            sequence_number=1,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1])
+
+        # Create manifest list 2: contains manifest1 and manifest2 
(overlapping manifest1)
+        manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list2_path),
+            snapshot_id=2,
+            parent_snapshot_id=1,
+            sequence_number=2,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2])
+
+        # Create manifest list 3: contains all three manifests (overlapping 
manifest1 and manifest2)
+        manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
+        with write_manifest_list(
+            format_version=2,
+            output_file=io.new_output(manifest_list3_path),
+            snapshot_id=3,
+            parent_snapshot_id=2,
+            sequence_number=3,
+            avro_compression="zstandard",
+        ) as list_writer:
+            list_writer.add_manifests([manifest_file1, manifest_file2, 
manifest_file3])
+
+        # Read all three manifest lists
+        manifests1 = _manifests(io, manifest_list1_path)
+        manifests2 = _manifests(io, manifest_list2_path)
+        manifests3 = _manifests(io, manifest_list3_path)
+
+        # Verify the manifest files have the expected paths
+        assert len(manifests1) == 1
+        assert len(manifests2) == 2
+        assert len(manifests3) == 3
+
+        # Verify that ManifestFile objects with the same manifest_path are the 
same object (identity)
+        # This is the key assertion - if caching works correctly, the same 
ManifestFile
+        # object should be reused instead of creating duplicates
+
+        # manifest_file1 appears in all three lists - should be the same object
+        assert manifests1[0] is manifests2[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+        assert manifests2[0] is manifests3[0], "ManifestFile1 should be the 
same object instance across manifest lists"
+
+        # manifest_file2 appears in lists 2 and 3 - should be the same object
+        assert manifests2[1] is manifests3[1], "ManifestFile2 should be the 
same object instance across manifest lists"
+
+        # Verify cache size - should only have 3 unique ManifestFile objects
+        # instead of 1 + 2 + 3 = 6 objects as with the old approach
+        assert len(_manifest_cache) == 3, (
+            f"Cache should contain exactly 3 unique ManifestFile objects, but 
has {len(_manifest_cache)}"
+        )
+
+
+def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
+    """Test that the manifest cache remains efficient with many overlapping 
manifest lists.
+
+    This simulates the scenario from GitHub issue #2325 where many appends 
create
+    manifest lists that increasingly overlap.
+    """
+    io = PyArrowFileIO()
+
+    with TemporaryDirectory() as tmp_dir:
+        schema = Schema(NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=True))
+        spec = UNPARTITIONED_PARTITION_SPEC
+
+        num_manifests = 10
+        manifest_files = []
+
+        # Create N manifest files
+        for i in range(num_manifests):
+            manifest_path = f"{tmp_dir}/manifest{i}.avro"
+            with write_manifest(
+                format_version=2,
+                spec=spec,
+                schema=schema,
+                output_file=io.new_output(manifest_path),
+                snapshot_id=i + 1,
+                avro_compression="zstandard",
+            ) as writer:
+                data_file = DataFile.from_args(
+                    content=DataFileContent.DATA,
+                    file_path=f"{tmp_dir}/data{i}.parquet",
+                    file_format=FileFormat.PARQUET,
+                    partition=Record(),
+                    record_count=100 * (i + 1),
+                    file_size_in_bytes=1000 * (i + 1),
+                )
+                writer.add_entry(
+                    ManifestEntry.from_args(
+                        status=ManifestEntryStatus.ADDED,
+                        snapshot_id=i + 1,
+                        data_file=data_file,
+                    )
+                )
+            manifest_files.append(writer.to_manifest_file())
+
+        # Create N manifest lists, each containing an increasing number of 
manifests
+        # list[i] contains manifests[0:i+1]
+        manifest_list_paths = []
+        for i in range(num_manifests):
+            list_path = f"{tmp_dir}/manifest-list{i}.avro"
+            with write_manifest_list(
+                format_version=2,
+                output_file=io.new_output(list_path),
+                snapshot_id=i + 1,
+                parent_snapshot_id=i if i > 0 else None,
+                sequence_number=i + 1,
+                avro_compression="zstandard",
+            ) as list_writer:
+                list_writer.add_manifests(manifest_files[: i + 1])
+            manifest_list_paths.append(list_path)
+
+        # Read all manifest lists
+        all_results = []
+        for path in manifest_list_paths:
+            result = _manifests(io, path)
+            all_results.append(result)
+
+        # With the old cache approach, we would have:
+        # 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory
+        # With the new approach, we should have exactly N objects
+
+        # Verify cache has exactly N unique entries
+        assert len(_manifest_cache) == num_manifests, (
+            f"Cache should contain exactly {num_manifests} ManifestFile 
objects, "
+            f"but has {len(_manifest_cache)}. "
+            f"Old approach would have {num_manifests * (num_manifests + 1) // 
2} objects."
+        )
+
+        # Verify object identity - all references to the same manifest should 
be the same object
+        for i in range(num_manifests):
+            # Find all references to this manifest across all results
+            references = []
+            for j, result in enumerate(all_results):
+                if j >= i:  # This manifest should be in lists from i onwards
+                    references.append(result[i])
+
+            # All references should be the same object
+            if len(references) > 1:
+                for ref in references[1:]:
+                    assert ref is references[0], f"All references to manifest 
{i} should be the same object instance"

Reply via email to