Fokko commented on code in PR #569: URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1615942072
########## pyiceberg/table/__init__.py: ########## @@ -2897,12 +2987,152 @@ def _commit(self) -> UpdatesAndRequirements: ), ( AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid), - AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), + AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"), ), ) -class FastAppendFiles(_MergingSnapshotProducer): +class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ + + _predicate: BooleanExpression + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._predicate = AlwaysFalse() + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to delete + if self.files_affected: + return super()._commit() + else: + return (), () + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + project = inclusive_projection(schema, spec) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + + def delete_by_predicate(self, predicate: BooleanExpression) -> None: + self._predicate = Or(self._predicate, predicate) + + @cached_property + def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]: + """Computes all the delete operation and cache it when nothing changes. + + Returns: + - List of existing manifests that are not affected by the delete operation. + - The manifest-entries that are deleted based on the metadata. + - Flag indicating that rewrites of data-files are needed. + """ + schema = self._transaction.table_metadata.schema() + + def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: + return ManifestEntry( + status=status, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + + existing_manifests = [] + total_deleted_entries = [] + partial_rewrites_needed = False + self._deleted_data_files = set() + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + if manifest_file.content == ManifestContent.DATA: + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + # If the manifest isn't relevant, we can just keep it in the manifest-list + existing_manifests.append(manifest_file) + else: + # It is relevant, let's check out the content + deleted_entries = [] + existing_entries = [] + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: + deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) + self._deleted_data_files.add(entry.data_file) + elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH: + existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + else: + # Based on the metadata, it is unsure to say if the file can be deleted + partial_rewrites_needed = True + + if len(deleted_entries) > 0: + total_deleted_entries += deleted_entries + + # Rewrite the manifest + if len(existing_entries) > 0: + output_file_location = _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, + ) + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self._io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for existing_entry in existing_entries: + writer.add_entry(existing_entry) + existing_manifests.append(writer.to_manifest_file()) Review Comment: Yes, that's a very interesting thought. Once we get https://github.com/apache/iceberg-python/pull/650 in this should be interesting 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org