syun64 commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1637279841


##########
pyiceberg/table/__init__.py:
##########
@@ -454,6 +482,74 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: Union[str, BooleanExpression], 
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            warnings.warn("Merge on read is not yet supported, falling back to 
copy-on-write")
+
+        if isinstance(delete_filter, str):
+            delete_filter = _parse_row_filter(delete_filter)
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            bound_delete_filter = bind(self._table.schema(), delete_filter, 
case_sensitive=True)
+            preserve_row_filter = 
expression_to_pyarrow(Not(bound_delete_filter))
+
+            files = self._scan(row_filter=delete_filter).plan_files()
+
+            commit_uuid = uuid.uuid4()
+            counter = itertools.count(0)
+
+            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
+            # This will load the Parquet file into memory, including:
+            #   - Filter out the rows based on the delete filter
+            #   - Projecting it to the current schema
+            #   - Applying the positional deletes if they are there
+            # When writing
+            #   - Apply the latest partition-spec
+            #   - And sort order when added
+            for original_file in files:
+                df = project_table(
+                    tasks=[original_file],
+                    table_metadata=self._table.metadata,
+                    io=self._table.io,
+                    row_filter=AlwaysTrue(),
+                    projected_schema=self.table_metadata.schema(),
+                )
+                filtered_df = df.filter(preserve_row_filter)
+
+                # Only rewrite if there are records being deleted
+                if len(df) != len(filtered_df):

Review Comment:
   This may be out of scope, but I'm thinking ahead to when we may want to use 
a Iterator[RecordBatch] to avoid materializing the whole table into memory, and 
I wonder if there's a way to get around having to use this condition.
   
   Would it be possible to determine if there are records that are being 
deleted by evaluating two file plans instead?
   
   If we evaluated:
   
   ```
   files_with_records_to_delete = 
self._scan(row_filter=delete_filter).plan_files()
   files_with_records_to_preserve = self._scan(row_filter= 
preserve_row_filter).plan_files()
   ```
   Would the intersection of these two lists of files be the ones we would have 
to rewrite?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to