This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a47d2bf Optimization: Prune manifest in snapshot overwrite operations 
(#3011)
8a47d2bf is described below

commit 8a47d2bfdcd576691717b262828e24f0f6b9fa06
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Fri Feb 20 20:42:18 2026 +0100

    Optimization: Prune manifest in snapshot overwrite operations (#3011)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    
    # Rationale for this change
    
    Doing some performance tests for overwriting partitions, we noticed that
    PyIceberg took double the time it usually takes java based
    implementation, we noticed that `_exisiting_manifests` does not take
    advantage of manifest pruning before reading all Manifest Entries
    
    In this PR I:
    - Moved methods from _DeleteFiles to _SnapshotProducer parent class to
    share with other classes (_OverwriteFiles)
    - Implemented manifest pruning over all deleted files partitions to not
    read manifests that do not match file partitions
    - Refactored the method to only iterate once over all files (instead of
    multiple)
    
    ## Are these changes tested?
    
    I believe current tests in tests/integration/test_writes.py cover all
    cases
    
    ## Are there any user-facing changes?
    
    Nope
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/table/__init__.py        |  82 ++++------------
 pyiceberg/table/update/snapshot.py | 190 ++++++++++++++++++-------------------
 2 files changed, 111 insertions(+), 161 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index cc0d9ff3..68089beb 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -26,25 +26,12 @@ from dataclasses import dataclass
 from functools import cached_property
 from itertools import chain
 from types import TracebackType
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    TypeVar,
-)
+from typing import TYPE_CHECKING, Any, TypeVar
 
 from pydantic import Field
 
 import pyiceberg.expressions.parser as parser
-from pyiceberg.expressions import (
-    AlwaysFalse,
-    AlwaysTrue,
-    And,
-    BooleanExpression,
-    EqualTo,
-    IsNull,
-    Or,
-    Reference,
-)
+from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And, 
BooleanExpression, EqualTo, IsNull, Or, Reference
 from pyiceberg.expressions.visitors import (
     ResidualEvaluator,
     _InclusiveMetricsEvaluator,
@@ -54,36 +41,17 @@ from pyiceberg.expressions.visitors import (
     manifest_evaluator,
 )
 from pyiceberg.io import FileIO, load_file_io
-from pyiceberg.manifest import (
-    DataFile,
-    DataFileContent,
-    ManifestContent,
-    ManifestEntry,
-    ManifestFile,
-)
-from pyiceberg.partitioning import (
-    PARTITION_FIELD_ID_START,
-    UNPARTITIONED_PARTITION_SPEC,
-    PartitionKey,
-    PartitionSpec,
-)
+from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, 
ManifestEntry, ManifestFile
+from pyiceberg.partitioning import PARTITION_FIELD_ID_START, 
UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table.delete_file_index import DeleteFileIndex
 from pyiceberg.table.inspect import InspectTable
 from pyiceberg.table.locations import LocationProvider, load_location_provider
 from pyiceberg.table.maintenance import MaintenanceTable
-from pyiceberg.table.metadata import (
-    INITIAL_SEQUENCE_NUMBER,
-    TableMetadata,
-)
-from pyiceberg.table.name_mapping import (
-    NameMapping,
-)
+from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
+from pyiceberg.table.name_mapping import NameMapping
 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
-from pyiceberg.table.snapshots import (
-    Snapshot,
-    SnapshotLogEntry,
-)
+from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.table.update import (
     AddPartitionSpecUpdate,
@@ -107,11 +75,7 @@ from pyiceberg.table.update import (
     update_table_metadata,
 )
 from pyiceberg.table.update.schema import UpdateSchema
-from pyiceberg.table.update.snapshot import (
-    ManageSnapshots,
-    UpdateSnapshot,
-    _FastAppendFiles,
-)
+from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, 
_FastAppendFiles
 from pyiceberg.table.update.sorting import UpdateSortOrder
 from pyiceberg.table.update.spec import UpdateSpec
 from pyiceberg.table.update.statistics import UpdateStatistics
@@ -126,9 +90,7 @@ from pyiceberg.typedef import (
     Record,
     TableVersion,
 )
-from pyiceberg.types import (
-    strtobool,
-)
+from pyiceberg.types import strtobool
 from pyiceberg.utils.concurrent import ExecutorFactory
 from pyiceberg.utils.config import Config
 from pyiceberg.utils.properties import property_as_bool
@@ -144,11 +106,7 @@ if TYPE_CHECKING:
     from pyiceberg_core.datafusion import IcebergDataFusionTable
 
     from pyiceberg.catalog import Catalog
-    from pyiceberg.catalog.rest.scan_planning import (
-        RESTContentFile,
-        RESTDeleteFile,
-        RESTFileScanTask,
-    )
+    from pyiceberg.catalog.rest.scan_planning import RESTContentFile, 
RESTDeleteFile, RESTFileScanTask
 
 ALWAYS_TRUE = AlwaysTrue()
 DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -396,17 +354,19 @@ class Transaction:
 
         return updates, requirements
 
-    def _build_partition_predicate(self, partition_records: set[Record]) -> 
BooleanExpression:
+    def _build_partition_predicate(
+        self, partition_records: set[Record], spec: PartitionSpec, schema: 
Schema
+    ) -> BooleanExpression:
         """Build a filter predicate matching any of the input partition 
records.
 
         Args:
             partition_records: A set of partition records to match
+            spec: An optional partition spec, if none then defaults to current
+            schema: An optional schema, if none then defaults to current
         Returns:
             A predicate matching any of the input partition records.
         """
-        partition_spec = self.table_metadata.spec()
-        schema = self.table_metadata.schema()
-        partition_fields = [schema.find_field(field.source_id).name for field 
in partition_spec.fields]
+        partition_fields = [schema.find_field(field.source_id).name for field 
in spec.fields]
 
         expr: BooleanExpression = AlwaysFalse()
         for partition_record in partition_records:
@@ -583,7 +543,9 @@ class Transaction:
         )
 
         partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
-        delete_filter = 
self._build_partition_predicate(partition_records=partitions_to_overwrite)
+        delete_filter = self._build_partition_predicate(
+            partition_records=partitions_to_overwrite, 
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
+        )
         self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties, branch=branch)
 
         with self._append_snapshot_producer(snapshot_properties, 
branch=branch) as append_files:
@@ -673,11 +635,7 @@ class Transaction:
             case_sensitive: A bool determine if the provided `delete_filter` 
is case-sensitive
             branch: Branch Reference to run the delete operation
         """
-        from pyiceberg.io.pyarrow import (
-            ArrowScan,
-            _dataframe_to_data_files,
-            _expression_to_complementary_pyarrow,
-        )
+        from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, 
_expression_to_complementary_pyarrow
 
         if (
             self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_DEFAULT)
diff --git a/pyiceberg/table/update/snapshot.py 
b/pyiceberg/table/update/snapshot.py
index c88337e7..37d12096 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -26,11 +26,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Generic
 
 from pyiceberg.avro.codecs import AvroCompressionCodec
-from pyiceberg.expressions import (
-    AlwaysFalse,
-    BooleanExpression,
-    Or,
-)
+from pyiceberg.expressions import AlwaysFalse, BooleanExpression, Or
 from pyiceberg.expressions.visitors import (
     ROWS_MIGHT_NOT_MATCH,
     ROWS_MUST_MATCH,
@@ -51,9 +47,8 @@ from pyiceberg.manifest import (
     write_manifest,
     write_manifest_list,
 )
-from pyiceberg.partitioning import (
-    PartitionSpec,
-)
+from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.schema import Schema
 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType
 from pyiceberg.table.snapshots import (
     Operation,
@@ -76,10 +71,7 @@ from pyiceberg.table.update import (
     UpdatesAndRequirements,
     UpdateTableMetadata,
 )
-from pyiceberg.typedef import (
-    EMPTY_DICT,
-    KeyDefaultDict,
-)
+from pyiceberg.typedef import EMPTY_DICT, KeyDefaultDict, Record
 from pyiceberg.utils.bin_packing import ListPacker
 from pyiceberg.utils.concurrent import ExecutorFactory
 from pyiceberg.utils.datetime import datetime_to_millis
@@ -110,6 +102,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     _deleted_data_files: set[DataFile]
     _compression: AvroCompressionCodec
     _target_branch: str | None
+    _predicate: BooleanExpression
+    _case_sensitive: bool
 
     def __init__(
         self,
@@ -138,6 +132,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := 
self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else 
None
         )
+        self._predicate = AlwaysFalse()
+        self._case_sensitive = True
 
     def _validate_target_branch(self, branch: str | None) -> str | None:
         # if branch is none, write will be written into a staging snapshot
@@ -182,13 +178,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], 
Generic[U]):
     def _manifests(self) -> list[ManifestFile]:
         def _write_added_manifest() -> list[ManifestFile]:
             if self._added_data_files:
-                with write_manifest(
-                    
format_version=self._transaction.table_metadata.format_version,
+                with self.new_manifest_writer(
                     spec=self._transaction.table_metadata.spec(),
-                    schema=self._transaction.table_metadata.schema(),
-                    output_file=self.new_manifest_output(),
-                    snapshot_id=self._snapshot_id,
-                    avro_compression=self._compression,
                 ) as writer:
                     for data_file in self._added_data_files:
                         writer.add(
@@ -213,14 +204,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], 
Generic[U]):
                 for deleted_entry in deleted_entries:
                     
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
                 for spec_id, entries in partition_groups.items():
-                    with write_manifest(
-                        
format_version=self._transaction.table_metadata.format_version,
-                        spec=self._transaction.table_metadata.specs()[spec_id],
-                        schema=self._transaction.table_metadata.schema(),
-                        output_file=self.new_manifest_output(),
-                        snapshot_id=self._snapshot_id,
-                        avro_compression=self._compression,
-                    ) as writer:
+                    with self.new_manifest_writer(self.spec(spec_id)) as 
writer:
                         for entry in entries:
                             writer.add_entry(entry)
                     deleted_manifests.append(writer.to_manifest_file())
@@ -228,6 +212,9 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
             else:
                 return []
 
+        # Updates self._predicate with computed partition predicate for 
manifest pruning
+        self._build_delete_files_partition_predicate()
+
         executor = ExecutorFactory.get_or_create()
 
         added_manifests = executor.submit(_write_added_manifest)
@@ -344,6 +331,9 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     def snapshot_id(self) -> int:
         return self._snapshot_id
 
+    def schema(self) -> Schema:
+        return self._transaction.table_metadata.schema()
+
     def spec(self, spec_id: int) -> PartitionSpec:
         return self._transaction.table_metadata.specs()[spec_id]
 
@@ -351,7 +341,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
         return write_manifest(
             format_version=self._transaction.table_metadata.format_version,
             spec=spec,
-            schema=self._transaction.table_metadata.schema(),
+            schema=self.schema(),
             output_file=self.new_manifest_output(),
             snapshot_id=self._snapshot_id,
             avro_compression=self._compression,
@@ -366,6 +356,35 @@ class _SnapshotProducer(UpdateTableMetadata[U], 
Generic[U]):
     def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: 
bool = True) -> list[ManifestEntry]:
         return manifest.fetch_manifest_entry(io=self._io, 
discard_deleted=discard_deleted)
 
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        project = inclusive_projection(self.schema(), self.spec(spec_id), 
self._case_sensitive)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        return manifest_evaluator(self.spec(spec_id), self.schema(), 
self.partition_filters[spec_id], self._case_sensitive)
+
+    def delete_by_predicate(self, predicate: BooleanExpression, 
case_sensitive: bool = True) -> None:
+        self._predicate = Or(self._predicate, predicate)
+        self._case_sensitive = case_sensitive
+
+    def _build_delete_files_partition_predicate(self) -> None:
+        """Build BooleanExpression based on deleted data files partitions."""
+        partition_to_overwrite: dict[int, set[Record]] = {}
+        for data_file in self._deleted_data_files:
+            group = partition_to_overwrite.setdefault(data_file.spec_id, set())
+            group.add(data_file.partition)
+
+        for spec_id, partition_records in partition_to_overwrite.items():
+            self.delete_by_predicate(
+                self._transaction._build_partition_predicate(
+                    partition_records=partition_records, schema=self.schema(), 
spec=self.spec(spec_id)
+                )
+            )
+
 
 class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
     """Will delete manifest entries from the current snapshot based on the 
predicate.
@@ -377,22 +396,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
     From the specification
     """
 
-    _predicate: BooleanExpression
-    _case_sensitive: bool
-
-    def __init__(
-        self,
-        operation: Operation,
-        transaction: Transaction,
-        io: FileIO,
-        branch: str | None = MAIN_BRANCH,
-        commit_uuid: uuid.UUID | None = None,
-        snapshot_properties: dict[str, str] = EMPTY_DICT,
-    ):
-        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties, branch)
-        self._predicate = AlwaysFalse()
-        self._case_sensitive = True
-
     def _commit(self) -> UpdatesAndRequirements:
         # Only produce a commit when there is something to delete
         if self.files_affected:
@@ -400,25 +403,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
         else:
             return (), ()
 
-    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
-        schema = self._transaction.table_metadata.schema()
-        spec = self._transaction.table_metadata.specs()[spec_id]
-        project = inclusive_projection(schema, spec, self._case_sensitive)
-        return project(self._predicate)
-
-    @cached_property
-    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
-        return KeyDefaultDict(self._build_partition_projection)
-
-    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
-        schema = self._transaction.table_metadata.schema()
-        spec = self._transaction.table_metadata.specs()[spec_id]
-        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], self._case_sensitive)
-
-    def delete_by_predicate(self, predicate: BooleanExpression, 
case_sensitive: bool = True) -> None:
-        self._predicate = Or(self._predicate, predicate)
-        self._case_sensitive = case_sensitive
-
     @cached_property
     def _compute_deletes(self) -> tuple[list[ManifestFile], 
list[ManifestEntry], bool]:
         """Computes all the delete operation and cache it when nothing changes.
@@ -428,7 +412,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
             - The manifest-entries that are deleted based on the metadata.
             - Flag indicating that rewrites of data-files are needed.
         """
-        schema = self._transaction.table_metadata.schema()
 
         def _copy_with_new_status(entry: ManifestEntry, status: 
ManifestEntryStatus) -> ManifestEntry:
             return ManifestEntry.from_args(
@@ -442,9 +425,11 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
             )
 
         manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
-        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, 
self._predicate, case_sensitive=self._case_sensitive).eval
+        strict_metrics_evaluator = _StrictMetricsEvaluator(
+            self.schema(), self._predicate, case_sensitive=self._case_sensitive
+        ).eval
         inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
-            schema, self._predicate, case_sensitive=self._case_sensitive
+            self.schema(), self._predicate, case_sensitive=self._case_sensitive
         ).eval
 
         existing_manifests = []
@@ -483,14 +468,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
 
                                 # Rewrite the manifest
                                 if len(existing_entries) > 0:
-                                    with write_manifest(
-                                        
format_version=self._transaction.table_metadata.format_version,
-                                        
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
-                                        
schema=self._transaction.table_metadata.schema(),
-                                        output_file=self.new_manifest_output(),
-                                        snapshot_id=self._snapshot_id,
-                                        avro_compression=self._compression,
-                                    ) as writer:
+                                    with 
self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as 
writer:
                                         for existing_entry in existing_entries:
                                             writer.add_entry(existing_entry)
                                     
existing_manifests.append(writer.to_manifest_file())
@@ -609,36 +587,46 @@ class 
_OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
         """Determine if there are any existing manifest files."""
         existing_files = []
 
+        manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
         if snapshot := 
self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
             for manifest_file in snapshot.manifests(io=self._io):
-                entries = manifest_file.fetch_manifest_entry(io=self._io, 
discard_deleted=True)
-                found_deleted_data_files = [entry.data_file for entry in 
entries if entry.data_file in self._deleted_data_files]
+                # Manifest does not contain rows that match the files to 
delete partitions
+                if not 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+                    existing_files.append(manifest_file)
+                    continue
+
+                entries_to_write: set[ManifestEntry] = set()
+                found_deleted_entries: set[ManifestEntry] = set()
+
+                for entry in manifest_file.fetch_manifest_entry(io=self._io, 
discard_deleted=True):
+                    if entry.data_file in self._deleted_data_files:
+                        found_deleted_entries.add(entry)
+                    else:
+                        entries_to_write.add(entry)
 
-                if len(found_deleted_data_files) == 0:
+                # Is the intercept the empty set?
+                if len(found_deleted_entries) == 0:
                     existing_files.append(manifest_file)
-                else:
-                    # We have to rewrite the manifest file without the deleted 
data files
-                    if any(entry.data_file not in found_deleted_data_files for 
entry in entries):
-                        with write_manifest(
-                            
format_version=self._transaction.table_metadata.format_version,
-                            
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
-                            schema=self._transaction.table_metadata.schema(),
-                            output_file=self.new_manifest_output(),
-                            snapshot_id=self._snapshot_id,
-                            avro_compression=self._compression,
-                        ) as writer:
-                            for entry in entries:
-                                if entry.data_file not in 
found_deleted_data_files:
-                                    writer.add_entry(
-                                        ManifestEntry.from_args(
-                                            
status=ManifestEntryStatus.EXISTING,
-                                            snapshot_id=entry.snapshot_id,
-                                            
sequence_number=entry.sequence_number,
-                                            
file_sequence_number=entry.file_sequence_number,
-                                            data_file=entry.data_file,
-                                        )
-                                    )
-                        existing_files.append(writer.to_manifest_file())
+                    continue
+
+                # Delete all files from manifest
+                if len(entries_to_write) == 0:
+                    continue
+
+                # We have to rewrite the manifest file without the deleted 
data files
+                with 
self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
+                    for entry in entries_to_write:
+                        writer.add_entry(
+                            ManifestEntry.from_args(
+                                status=ManifestEntryStatus.EXISTING,
+                                snapshot_id=entry.snapshot_id,
+                                sequence_number=entry.sequence_number,
+                                
file_sequence_number=entry.file_sequence_number,
+                                data_file=entry.data_file,
+                            )
+                        )
+                existing_files.append(writer.to_manifest_file())
+
         return existing_files
 
     def _deleted_entries(self) -> list[ManifestEntry]:
@@ -655,8 +643,12 @@ class 
_OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
                 raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
 
             executor = ExecutorFactory.get_or_create()
+            manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
 
             def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+                if not 
manifest_evaluators[manifest.partition_spec_id](manifest):
+                    return []
+
                 return [
                     ManifestEntry.from_args(
                         status=ManifestEntryStatus.DELETED,

Reply via email to