jqin61 commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1599183250


##########
pyiceberg/table/__init__.py:
##########
@@ -2897,12 +2987,152 @@ def _commit(self) -> UpdatesAndRequirements:
             ),
             (
                 
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
-                AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, 
ref="main"),
+                
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),
             ),
         )
 
 
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+    """Will delete manifest entries from the current snapshot based on the 
predicate.
+
+    This will produce a DELETE snapshot:
+        Data files were removed and their contents logically deleted and/or 
delete
+        files were added to delete rows.
+
+    From the specification
+    """
+
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _commit(self) -> UpdatesAndRequirements:
+        # Only produce a commit when there is something to delete
+        if self.files_affected:
+            return super()._commit()
+        else:
+            return (), ()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def delete_by_predicate(self, predicate: BooleanExpression) -> None:
+        self._predicate = Or(self._predicate, predicate)
+
+    @cached_property
+    def _compute_deletes(self) -> Tuple[List[ManifestFile], 
List[ManifestEntry], bool]:
+        """Computes all the delete operation and cache it when nothing changes.
+
+        Returns:
+            - List of existing manifests that are not affected by the delete 
operation.
+            - The manifest-entries that are deleted based on the metadata.
+            - Flag indicating that rewrites of data-files are needed.
+        """
+        schema = self._transaction.table_metadata.schema()
+
+        def _copy_with_new_status(entry: ManifestEntry, status: 
ManifestEntryStatus) -> ManifestEntry:
+            return ManifestEntry(
+                status=status,
+                snapshot_id=entry.snapshot_id,
+                data_sequence_number=entry.data_sequence_number,
+                file_sequence_number=entry.file_sequence_number,
+                data_file=entry.data_file,
+            )
+
+        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+        inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+
+        existing_manifests = []
+        total_deleted_entries = []
+        partial_rewrites_needed = False
+        self._deleted_data_files = set()
+        if snapshot := self._transaction.table_metadata.current_snapshot():
+            for manifest_file in snapshot.manifests(io=self._io):
+                if manifest_file.content == ManifestContent.DATA:
+                    if not 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+                        # If the manifest isn't relevant, we can just keep it 
in the manifest-list
+                        existing_manifests.append(manifest_file)
+                    else:
+                        # It is relevant, let's check out the content
+                        deleted_entries = []
+                        existing_entries = []
+                        for entry in 
manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+                            if strict_metrics_evaluator(entry.data_file) == 
ROWS_MUST_MATCH:
+                                
deleted_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.DELETED))
+                                self._deleted_data_files.add(entry.data_file)
+                            elif inclusive_metrics_evaluator(entry.data_file) 
== ROWS_CANNOT_MATCH:
+                                
existing_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.EXISTING))
+                            else:
+                                # Based on the metadata, it is unsure to say 
if the file can be deleted
+                                partial_rewrites_needed = True
+
+                        if len(deleted_entries) > 0:
+                            total_deleted_entries += deleted_entries
+
+                            # Rewrite the manifest
+                            if len(existing_entries) > 0:
+                                output_file_location = _new_manifest_path(
+                                    
location=self._transaction.table_metadata.location,
+                                    num=next(self._manifest_counter),
+                                    commit_uuid=self.commit_uuid,
+                                )
+                                with write_manifest(
+                                    
format_version=self._transaction.table_metadata.format_version,
+                                    
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+                                    
schema=self._transaction.table_metadata.schema(),
+                                    
output_file=self._io.new_output(output_file_location),
+                                    snapshot_id=self._snapshot_id,
+                                ) as writer:
+                                    for existing_entry in existing_entries:
+                                        writer.add_entry(existing_entry)
+                                
existing_manifests.append(writer.to_manifest_file())

Review Comment:
   The current behavior writes one existing manifest for data file entries 
which are not deleted by DeleteFiles for each old manifest in parent snapshot, 
I am thinking maybe it could be a future enhancement to collect the 
existing_entries across different old manifests and then write 
existing_manifest. Imagine such a scenario where there are multiple small fast 
appends before we call DeleteFiles, and each fast_append appends data that is 
partially deleted by the final DeleteFiles, then this DeleteFiles snapshot will 
have almost the same amount of 'existing' manifests as rather than merging them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to