Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1615942072


##########
pyiceberg/table/__init__.py:
##########
@@ -2897,12 +2987,152 @@ def _commit(self) -> UpdatesAndRequirements:
             ),
             (
                 
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
-                AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, 
ref="main"),
+                
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),
             ),
         )
 
 
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+    """Will delete manifest entries from the current snapshot based on the 
predicate.
+
+    This will produce a DELETE snapshot:
+        Data files were removed and their contents logically deleted and/or 
delete
+        files were added to delete rows.
+
+    From the specification
+    """
+
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _commit(self) -> UpdatesAndRequirements:
+        # Only produce a commit when there is something to delete
+        if self.files_affected:
+            return super()._commit()
+        else:
+            return (), ()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def delete_by_predicate(self, predicate: BooleanExpression) -> None:
+        self._predicate = Or(self._predicate, predicate)
+
+    @cached_property
+    def _compute_deletes(self) -> Tuple[List[ManifestFile], 
List[ManifestEntry], bool]:
+        """Computes all the delete operation and cache it when nothing changes.
+
+        Returns:
+            - List of existing manifests that are not affected by the delete 
operation.
+            - The manifest-entries that are deleted based on the metadata.
+            - Flag indicating that rewrites of data-files are needed.
+        """
+        schema = self._transaction.table_metadata.schema()
+
+        def _copy_with_new_status(entry: ManifestEntry, status: 
ManifestEntryStatus) -> ManifestEntry:
+            return ManifestEntry(
+                status=status,
+                snapshot_id=entry.snapshot_id,
+                data_sequence_number=entry.data_sequence_number,
+                file_sequence_number=entry.file_sequence_number,
+                data_file=entry.data_file,
+            )
+
+        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+        inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+
+        existing_manifests = []
+        total_deleted_entries = []
+        partial_rewrites_needed = False
+        self._deleted_data_files = set()
+        if snapshot := self._transaction.table_metadata.current_snapshot():
+            for manifest_file in snapshot.manifests(io=self._io):
+                if manifest_file.content == ManifestContent.DATA:
+                    if not 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+                        # If the manifest isn't relevant, we can just keep it 
in the manifest-list
+                        existing_manifests.append(manifest_file)
+                    else:
+                        # It is relevant, let's check out the content
+                        deleted_entries = []
+                        existing_entries = []
+                        for entry in 
manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+                            if strict_metrics_evaluator(entry.data_file) == 
ROWS_MUST_MATCH:
+                                
deleted_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.DELETED))
+                                self._deleted_data_files.add(entry.data_file)
+                            elif inclusive_metrics_evaluator(entry.data_file) 
== ROWS_CANNOT_MATCH:
+                                
existing_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.EXISTING))
+                            else:
+                                # Based on the metadata, it is unsure to say 
if the file can be deleted
+                                partial_rewrites_needed = True
+
+                        if len(deleted_entries) > 0:
+                            total_deleted_entries += deleted_entries
+
+                            # Rewrite the manifest
+                            if len(existing_entries) > 0:
+                                output_file_location = _new_manifest_path(
+                                    
location=self._transaction.table_metadata.location,
+                                    num=next(self._manifest_counter),
+                                    commit_uuid=self.commit_uuid,
+                                )
+                                with write_manifest(
+                                    
format_version=self._transaction.table_metadata.format_version,
+                                    
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+                                    
schema=self._transaction.table_metadata.schema(),
+                                    
output_file=self._io.new_output(output_file_location),
+                                    snapshot_id=self._snapshot_id,
+                                ) as writer:
+                                    for existing_entry in existing_entries:
+                                        writer.add_entry(existing_entry)
+                                
existing_manifests.append(writer.to_manifest_file())

Review Comment:
   Yes, that's a very interesting thought. Once we get 
https://github.com/apache/iceberg-python/pull/650 in this should be interesting 
👍 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to