jayceslesar commented on code in PR #2255: URL: https://github.com/apache/iceberg-python/pull/2255#discussion_r2670312840
########## tests/table/test_delete_file_index.py: ########## @@ -0,0 +1,516 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from pyiceberg.manifest import DataFileContent, FileFormat +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table.delete_file_index import ( + DeleteFileIndex, + EqualityDeleteFileWrapper, + EqualityDeletesGroup, + PositionalDeleteFileWrapper, + PositionalDeletesGroup, +) +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType +from tests.conftest import ( + create_data_file, + create_deletion_vector_entry, + create_equality_delete_entry, + create_equality_delete_file, + create_manifest_entry_with_delete_file, + create_partition_positional_delete_entry, + create_positional_delete_entry, +) + + +class TestDeleteFileIndex: + """Tests for the DeleteFileIndex class.""" Review Comment: I dont think classes for testing are used anywhere else in this repo? Probably not an issue but calling it out ########## pyiceberg/table/delete_file_index.py: ########## @@ -0,0 +1,528 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from bisect import bisect_left +from typing import Any + +from pyiceberg.conversions import from_bytes +from pyiceberg.expressions import EqualTo +from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator +from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, FileFormat, ManifestEntry +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.typedef import Record +from pyiceberg.types import NestedField +from pyiceberg.utils.partition_map import PartitionMap + +PATH_FIELD_ID = 2147483546 + + +class EqualityDeleteFileWrapper: + """Stores the equality delete file along with the sequence number.""" + + def __init__(self, manifest_entry: ManifestEntry, schema: Schema) -> None: + """Initialize a new EqualityDeleteFileWrapper. + + Args: + manifest_entry: The manifest entry containing the delete file + schema: The table schema for field lookups + """ + self.delete_file = manifest_entry.data_file + self.schema = schema + self.apply_sequence_number = (manifest_entry.sequence_number or 0) - 1 + self._converted_lower_bounds: dict[int, Any] | None = None + self._converted_upper_bounds: dict[int, Any] | None = None + self._equality_fields: list[NestedField] | None = None + + def equality_fields(self) -> list[NestedField]: + """Get equality fields for current delete file. + + Returns: + List of NestedField objects representing the equality fields + """ + if self._equality_fields is None: + fields = [] + for field_id in self.delete_file.equality_ids or []: + field = self.schema.find_field(field_id) + if field: + fields.append(field) + self._equality_fields = fields + return self._equality_fields + + def lower_bound(self, field_id: int) -> Any | None: + """Convert or get lower bound for a field. + + Args: + field_id: The field ID to get the bound for + + Returns: + The converted lower bound value or None if not available + """ + if self._converted_lower_bounds is None: + self._converted_lower_bounds = self._convert_bounds(self.delete_file.lower_bounds) + return self._converted_lower_bounds.get(field_id) + + def upper_bound(self, field_id: int) -> Any | None: + """Convert or get upper bound for a field. + + Args: + field_id: The field ID to get the bound for + + Returns: + The converted upper bound value or None if not available + """ + if self._converted_upper_bounds is None: + self._converted_upper_bounds = self._convert_bounds(self.delete_file.upper_bounds) + return self._converted_upper_bounds.get(field_id) + + def _convert_bounds(self, bounds: dict[int, bytes]) -> dict[int, Any]: + """Convert byte bounds to their proper types. + + Args: + bounds: Dictionary mapping field IDs to byte bounds + + Returns: + Dictionary mapping field IDs to converted bound values + """ + if not bounds: + return {} + + converted = {} + for field in self.equality_fields(): + field_id = field.field_id + bound = bounds.get(field_id) + if bound is not None: + # Use the field type to convert the bound + converted[field_id] = from_bytes(field.field_type, bound) + return converted + + +class PositionalDeleteFileWrapper: + """Stores the position delete file along with the sequence number for filtering.""" + + def __init__(self, manifest_entry: ManifestEntry): + """Initialize a new PositionalDeleteFileWrapper. + + Args: + manifest_entry: The manifest entry containing the delete file + """ + self.delete_file = manifest_entry.data_file + self.apply_sequence_number = manifest_entry.sequence_number or 0 + + +class DeletesGroup: + """Base class for managing collections of delete files with lazy sorting and binary search. + + Provides O(1) insertion with deferred O(n log n) sorting and O(log n + k) filtering + where k is the number of matching delete files. + """ + + def __init__(self) -> None: + """Initialize a new DeletesGroup.""" + self._buffer: list[Any] | None = [] + self._sorted: bool = False # Lazy sorting flag + self._seqs: list[int] | None = None + self._files: list[Any] | None = None + + def add(self, wrapper: Any) -> None: + """Add a delete file wrapper to the group. + + Args: + wrapper: The delete file wrapper to add + + Raises: + ValueError: If attempting to add files after indexing + """ + if self._buffer is None: + raise ValueError("Can't add files to group after indexing") + self._buffer.append(wrapper) + self._sorted = False + + def _index_if_needed(self) -> None: + """Sort wrappers by apply_sequence_number if not already sorted. + + This method implements lazy sorting to avoid unnecessary work when + files are only added but not queried. + """ + if not self._sorted: + self._files = sorted(self._buffer, key=lambda f: f.apply_sequence_number) # type: ignore + self._seqs = [f.apply_sequence_number for f in self._files] + self._buffer = None + self._sorted = True + + def _get_candidates(self, seq: int) -> list[Any]: + """Get delete files with apply_sequence_number >= seq using binary search. + + Args: + seq: The sequence number to filter by + + Returns: + List of delete file wrappers with sequence number >= seq + """ + self._index_if_needed() + + if not self._files or not self._seqs: + return [] + + start_idx = bisect_left(self._seqs, seq) + + if start_idx >= len(self._files): + return [] + + return self._files[start_idx:] + + +class EqualityDeletesGroup(DeletesGroup): + """Extends the base DeletesGroup with equality-specific filtering logic. + + Uses file statistics and bounds to eliminate impossible matches before expensive operations. + This optimization significantly reduces the number of delete files that need to be processed + during scan planning. + """ + + def filter(self, seq: int, data_file: DataFile) -> list[DataFile]: Review Comment: I wonder if its worth enforcing the interface that `DeletesGroup` could expose? I guess the `filter` method just isnt required by the shared base class but both downstream classes use it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
