This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 8a47d2bf Optimization: Prune manifest in snapshot overwrite operations
(#3011)
8a47d2bf is described below
commit 8a47d2bfdcd576691717b262828e24f0f6b9fa06
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Fri Feb 20 20:42:18 2026 +0100
Optimization: Prune manifest in snapshot overwrite operations (#3011)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Doing some performance tests for overwriting partitions, we noticed that
PyIceberg took double the time it usually takes java based
implementation, we noticed that `_exisiting_manifests` does not take
advantage of manifest pruning before reading all Manifest Entries
In this PR I:
- Moved methods from _DeleteFiles to _SnapshotProducer parent class to
share with other classes (_OverwriteFiles)
- Implemented manifest pruning over all deleted files partitions to not
read manifests that do not match file partitions
- Refactored the method to only iterate once over all files (instead of
multiple)
## Are these changes tested?
I believe current tests in tests/integration/test_writes.py cover all
cases
## Are there any user-facing changes?
Nope
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/table/__init__.py | 82 ++++------------
pyiceberg/table/update/snapshot.py | 190 ++++++++++++++++++-------------------
2 files changed, 111 insertions(+), 161 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index cc0d9ff3..68089beb 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -26,25 +26,12 @@ from dataclasses import dataclass
from functools import cached_property
from itertools import chain
from types import TracebackType
-from typing import (
- TYPE_CHECKING,
- Any,
- TypeVar,
-)
+from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import Field
import pyiceberg.expressions.parser as parser
-from pyiceberg.expressions import (
- AlwaysFalse,
- AlwaysTrue,
- And,
- BooleanExpression,
- EqualTo,
- IsNull,
- Or,
- Reference,
-)
+from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And,
BooleanExpression, EqualTo, IsNull, Or, Reference
from pyiceberg.expressions.visitors import (
ResidualEvaluator,
_InclusiveMetricsEvaluator,
@@ -54,36 +41,17 @@ from pyiceberg.expressions.visitors import (
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
-from pyiceberg.manifest import (
- DataFile,
- DataFileContent,
- ManifestContent,
- ManifestEntry,
- ManifestFile,
-)
-from pyiceberg.partitioning import (
- PARTITION_FIELD_ID_START,
- UNPARTITIONED_PARTITION_SPEC,
- PartitionKey,
- PartitionSpec,
-)
+from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent,
ManifestEntry, ManifestFile
+from pyiceberg.partitioning import PARTITION_FIELD_ID_START,
UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
-from pyiceberg.table.metadata import (
- INITIAL_SEQUENCE_NUMBER,
- TableMetadata,
-)
-from pyiceberg.table.name_mapping import (
- NameMapping,
-)
+from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
+from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
-from pyiceberg.table.snapshots import (
- Snapshot,
- SnapshotLogEntry,
-)
+from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
AddPartitionSpecUpdate,
@@ -107,11 +75,7 @@ from pyiceberg.table.update import (
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
-from pyiceberg.table.update.snapshot import (
- ManageSnapshots,
- UpdateSnapshot,
- _FastAppendFiles,
-)
+from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot,
_FastAppendFiles
from pyiceberg.table.update.sorting import UpdateSortOrder
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
@@ -126,9 +90,7 @@ from pyiceberg.typedef import (
Record,
TableVersion,
)
-from pyiceberg.types import (
- strtobool,
-)
+from pyiceberg.types import strtobool
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.properties import property_as_bool
@@ -144,11 +106,7 @@ if TYPE_CHECKING:
from pyiceberg_core.datafusion import IcebergDataFusionTable
from pyiceberg.catalog import Catalog
- from pyiceberg.catalog.rest.scan_planning import (
- RESTContentFile,
- RESTDeleteFile,
- RESTFileScanTask,
- )
+ from pyiceberg.catalog.rest.scan_planning import RESTContentFile,
RESTDeleteFile, RESTFileScanTask
ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -396,17 +354,19 @@ class Transaction:
return updates, requirements
- def _build_partition_predicate(self, partition_records: set[Record]) ->
BooleanExpression:
+ def _build_partition_predicate(
+ self, partition_records: set[Record], spec: PartitionSpec, schema:
Schema
+ ) -> BooleanExpression:
"""Build a filter predicate matching any of the input partition
records.
Args:
partition_records: A set of partition records to match
+ spec: An optional partition spec, if none then defaults to current
+ schema: An optional schema, if none then defaults to current
Returns:
A predicate matching any of the input partition records.
"""
- partition_spec = self.table_metadata.spec()
- schema = self.table_metadata.schema()
- partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+ partition_fields = [schema.find_field(field.source_id).name for field
in spec.fields]
expr: BooleanExpression = AlwaysFalse()
for partition_record in partition_records:
@@ -583,7 +543,9 @@ class Transaction:
)
partitions_to_overwrite = {data_file.partition for data_file in
data_files}
- delete_filter =
self._build_partition_predicate(partition_records=partitions_to_overwrite)
+ delete_filter = self._build_partition_predicate(
+ partition_records=partitions_to_overwrite,
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
+ )
self.delete(delete_filter=delete_filter,
snapshot_properties=snapshot_properties, branch=branch)
with self._append_snapshot_producer(snapshot_properties,
branch=branch) as append_files:
@@ -673,11 +635,7 @@ class Transaction:
case_sensitive: A bool determine if the provided `delete_filter`
is case-sensitive
branch: Branch Reference to run the delete operation
"""
- from pyiceberg.io.pyarrow import (
- ArrowScan,
- _dataframe_to_data_files,
- _expression_to_complementary_pyarrow,
- )
+ from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files,
_expression_to_complementary_pyarrow
if (
self.table_metadata.properties.get(TableProperties.DELETE_MODE,
TableProperties.DELETE_MODE_DEFAULT)
diff --git a/pyiceberg/table/update/snapshot.py
b/pyiceberg/table/update/snapshot.py
index c88337e7..37d12096 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -26,11 +26,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Generic
from pyiceberg.avro.codecs import AvroCompressionCodec
-from pyiceberg.expressions import (
- AlwaysFalse,
- BooleanExpression,
- Or,
-)
+from pyiceberg.expressions import AlwaysFalse, BooleanExpression, Or
from pyiceberg.expressions.visitors import (
ROWS_MIGHT_NOT_MATCH,
ROWS_MUST_MATCH,
@@ -51,9 +47,8 @@ from pyiceberg.manifest import (
write_manifest,
write_manifest_list,
)
-from pyiceberg.partitioning import (
- PartitionSpec,
-)
+from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.schema import Schema
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType
from pyiceberg.table.snapshots import (
Operation,
@@ -76,10 +71,7 @@ from pyiceberg.table.update import (
UpdatesAndRequirements,
UpdateTableMetadata,
)
-from pyiceberg.typedef import (
- EMPTY_DICT,
- KeyDefaultDict,
-)
+from pyiceberg.typedef import EMPTY_DICT, KeyDefaultDict, Record
from pyiceberg.utils.bin_packing import ListPacker
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
@@ -110,6 +102,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_deleted_data_files: set[DataFile]
_compression: AvroCompressionCodec
_target_branch: str | None
+ _predicate: BooleanExpression
+ _case_sensitive: bool
def __init__(
self,
@@ -138,6 +132,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot :=
self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else
None
)
+ self._predicate = AlwaysFalse()
+ self._case_sensitive = True
def _validate_target_branch(self, branch: str | None) -> str | None:
# if branch is none, write will be written into a staging snapshot
@@ -182,13 +178,8 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
def _manifests(self) -> list[ManifestFile]:
def _write_added_manifest() -> list[ManifestFile]:
if self._added_data_files:
- with write_manifest(
-
format_version=self._transaction.table_metadata.format_version,
+ with self.new_manifest_writer(
spec=self._transaction.table_metadata.spec(),
- schema=self._transaction.table_metadata.schema(),
- output_file=self.new_manifest_output(),
- snapshot_id=self._snapshot_id,
- avro_compression=self._compression,
) as writer:
for data_file in self._added_data_files:
writer.add(
@@ -213,14 +204,7 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
for deleted_entry in deleted_entries:
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
for spec_id, entries in partition_groups.items():
- with write_manifest(
-
format_version=self._transaction.table_metadata.format_version,
- spec=self._transaction.table_metadata.specs()[spec_id],
- schema=self._transaction.table_metadata.schema(),
- output_file=self.new_manifest_output(),
- snapshot_id=self._snapshot_id,
- avro_compression=self._compression,
- ) as writer:
+ with self.new_manifest_writer(self.spec(spec_id)) as
writer:
for entry in entries:
writer.add_entry(entry)
deleted_manifests.append(writer.to_manifest_file())
@@ -228,6 +212,9 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
else:
return []
+ # Updates self._predicate with computed partition predicate for
manifest pruning
+ self._build_delete_files_partition_predicate()
+
executor = ExecutorFactory.get_or_create()
added_manifests = executor.submit(_write_added_manifest)
@@ -344,6 +331,9 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
def snapshot_id(self) -> int:
return self._snapshot_id
+ def schema(self) -> Schema:
+ return self._transaction.table_metadata.schema()
+
def spec(self, spec_id: int) -> PartitionSpec:
return self._transaction.table_metadata.specs()[spec_id]
@@ -351,7 +341,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
return write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=spec,
- schema=self._transaction.table_metadata.schema(),
+ schema=self.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
@@ -366,6 +356,35 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted:
bool = True) -> list[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io,
discard_deleted=discard_deleted)
+ def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+ project = inclusive_projection(self.schema(), self.spec(spec_id),
self._case_sensitive)
+ return project(self._predicate)
+
+ @cached_property
+ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+ return KeyDefaultDict(self._build_partition_projection)
+
+ def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
+ return manifest_evaluator(self.spec(spec_id), self.schema(),
self.partition_filters[spec_id], self._case_sensitive)
+
+ def delete_by_predicate(self, predicate: BooleanExpression,
case_sensitive: bool = True) -> None:
+ self._predicate = Or(self._predicate, predicate)
+ self._case_sensitive = case_sensitive
+
+ def _build_delete_files_partition_predicate(self) -> None:
+ """Build BooleanExpression based on deleted data files partitions."""
+ partition_to_overwrite: dict[int, set[Record]] = {}
+ for data_file in self._deleted_data_files:
+ group = partition_to_overwrite.setdefault(data_file.spec_id, set())
+ group.add(data_file.partition)
+
+ for spec_id, partition_records in partition_to_overwrite.items():
+ self.delete_by_predicate(
+ self._transaction._build_partition_predicate(
+ partition_records=partition_records, schema=self.schema(),
spec=self.spec(spec_id)
+ )
+ )
+
class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the
predicate.
@@ -377,22 +396,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
From the specification
"""
- _predicate: BooleanExpression
- _case_sensitive: bool
-
- def __init__(
- self,
- operation: Operation,
- transaction: Transaction,
- io: FileIO,
- branch: str | None = MAIN_BRANCH,
- commit_uuid: uuid.UUID | None = None,
- snapshot_properties: dict[str, str] = EMPTY_DICT,
- ):
- super().__init__(operation, transaction, io, commit_uuid,
snapshot_properties, branch)
- self._predicate = AlwaysFalse()
- self._case_sensitive = True
-
def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
if self.files_affected:
@@ -400,25 +403,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
else:
return (), ()
- def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
- schema = self._transaction.table_metadata.schema()
- spec = self._transaction.table_metadata.specs()[spec_id]
- project = inclusive_projection(schema, spec, self._case_sensitive)
- return project(self._predicate)
-
- @cached_property
- def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
- return KeyDefaultDict(self._build_partition_projection)
-
- def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
- schema = self._transaction.table_metadata.schema()
- spec = self._transaction.table_metadata.specs()[spec_id]
- return manifest_evaluator(spec, schema,
self.partition_filters[spec_id], self._case_sensitive)
-
- def delete_by_predicate(self, predicate: BooleanExpression,
case_sensitive: bool = True) -> None:
- self._predicate = Or(self._predicate, predicate)
- self._case_sensitive = case_sensitive
-
@cached_property
def _compute_deletes(self) -> tuple[list[ManifestFile],
list[ManifestEntry], bool]:
"""Computes all the delete operation and cache it when nothing changes.
@@ -428,7 +412,6 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
- The manifest-entries that are deleted based on the metadata.
- Flag indicating that rewrites of data-files are needed.
"""
- schema = self._transaction.table_metadata.schema()
def _copy_with_new_status(entry: ManifestEntry, status:
ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry.from_args(
@@ -442,9 +425,11 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
)
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
- strict_metrics_evaluator = _StrictMetricsEvaluator(schema,
self._predicate, case_sensitive=self._case_sensitive).eval
+ strict_metrics_evaluator = _StrictMetricsEvaluator(
+ self.schema(), self._predicate, case_sensitive=self._case_sensitive
+ ).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
- schema, self._predicate, case_sensitive=self._case_sensitive
+ self.schema(), self._predicate, case_sensitive=self._case_sensitive
).eval
existing_manifests = []
@@ -483,14 +468,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
# Rewrite the manifest
if len(existing_entries) > 0:
- with write_manifest(
-
format_version=self._transaction.table_metadata.format_version,
-
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
-
schema=self._transaction.table_metadata.schema(),
- output_file=self.new_manifest_output(),
- snapshot_id=self._snapshot_id,
- avro_compression=self._compression,
- ) as writer:
+ with
self.new_manifest_writer(spec=self.spec(manifest_file.partition_spec_id)) as
writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
@@ -609,36 +587,46 @@ class
_OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Determine if there are any existing manifest files."""
existing_files = []
+ manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
if snapshot :=
self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
for manifest_file in snapshot.manifests(io=self._io):
- entries = manifest_file.fetch_manifest_entry(io=self._io,
discard_deleted=True)
- found_deleted_data_files = [entry.data_file for entry in
entries if entry.data_file in self._deleted_data_files]
+ # Manifest does not contain rows that match the files to
delete partitions
+ if not
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+ existing_files.append(manifest_file)
+ continue
+
+ entries_to_write: set[ManifestEntry] = set()
+ found_deleted_entries: set[ManifestEntry] = set()
+
+ for entry in manifest_file.fetch_manifest_entry(io=self._io,
discard_deleted=True):
+ if entry.data_file in self._deleted_data_files:
+ found_deleted_entries.add(entry)
+ else:
+ entries_to_write.add(entry)
- if len(found_deleted_data_files) == 0:
+ # Is the intercept the empty set?
+ if len(found_deleted_entries) == 0:
existing_files.append(manifest_file)
- else:
- # We have to rewrite the manifest file without the deleted
data files
- if any(entry.data_file not in found_deleted_data_files for
entry in entries):
- with write_manifest(
-
format_version=self._transaction.table_metadata.format_version,
-
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
- schema=self._transaction.table_metadata.schema(),
- output_file=self.new_manifest_output(),
- snapshot_id=self._snapshot_id,
- avro_compression=self._compression,
- ) as writer:
- for entry in entries:
- if entry.data_file not in
found_deleted_data_files:
- writer.add_entry(
- ManifestEntry.from_args(
-
status=ManifestEntryStatus.EXISTING,
- snapshot_id=entry.snapshot_id,
-
sequence_number=entry.sequence_number,
-
file_sequence_number=entry.file_sequence_number,
- data_file=entry.data_file,
- )
- )
- existing_files.append(writer.to_manifest_file())
+ continue
+
+ # Delete all files from manifest
+ if len(entries_to_write) == 0:
+ continue
+
+ # We have to rewrite the manifest file without the deleted
data files
+ with
self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
+ for entry in entries_to_write:
+ writer.add_entry(
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.EXISTING,
+ snapshot_id=entry.snapshot_id,
+ sequence_number=entry.sequence_number,
+
file_sequence_number=entry.file_sequence_number,
+ data_file=entry.data_file,
+ )
+ )
+ existing_files.append(writer.to_manifest_file())
+
return existing_files
def _deleted_entries(self) -> list[ManifestEntry]:
@@ -655,8 +643,12 @@ class
_OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
raise ValueError(f"Could not find the previous snapshot:
{self._parent_snapshot_id}")
executor = ExecutorFactory.get_or_create()
+ manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
+ if not
manifest_evaluators[manifest.partition_spec_id](manifest):
+ return []
+
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,