Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1666868594


##########
pyiceberg/table/__init__.py:
##########
@@ -454,6 +482,74 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: Union[str, BooleanExpression], 
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            warnings.warn("Merge on read is not yet supported, falling back to 
copy-on-write")
+
+        if isinstance(delete_filter, str):
+            delete_filter = _parse_row_filter(delete_filter)
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            bound_delete_filter = bind(self._table.schema(), delete_filter, 
case_sensitive=True)
+            preserve_row_filter = 
expression_to_pyarrow(Not(bound_delete_filter))
+
+            files = self._scan(row_filter=delete_filter).plan_files()
+
+            commit_uuid = uuid.uuid4()
+            counter = itertools.count(0)
+
+            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
+            # This will load the Parquet file into memory, including:
+            #   - Filter out the rows based on the delete filter
+            #   - Projecting it to the current schema
+            #   - Applying the positional deletes if they are there
+            # When writing
+            #   - Apply the latest partition-spec
+            #   - And sort order when added
+            for original_file in files:
+                df = project_table(
+                    tasks=[original_file],
+                    table_metadata=self._table.metadata,
+                    io=self._table.io,
+                    row_filter=AlwaysTrue(),
+                    projected_schema=self.table_metadata.schema(),
+                )
+                filtered_df = df.filter(preserve_row_filter)
+
+                # Only rewrite if there are records being deleted
+                if len(df) != len(filtered_df):

Review Comment:
   This is on a file basis, so hopefully, that fits into memory. I do think 
that iterating over record batches might be an interesting thing to do here.
   
   > Would the intersection of these two lists of files be the ones we would 
have to rewrite?
   
   We don't know for sure. If we're looking for `x = 22`, and the upper and 
lower bound are `[19, 25]`, then it will match the evaluator, but we're still 
not sure if there is a row that matches the predicate until we've gone through 
the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to