szehon-ho commented on a change in pull request #3069:
URL: https://github.com/apache/iceberg/pull/3069#discussion_r709735067



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -316,6 +316,58 @@ protected void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
     }
   }
 
+  /**
+   * Validates that no delete files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find new conflicting delete files
+   * @param caseSensitive whether expression evaluation should be 
case-sensitive
+   */
+
+  protected void validateNoNewDeletes(TableMetadata base, Long 
startingSnapshotId,
+                                      Expression dataFilter, boolean 
caseSensitive) {
+    // if there is no current table state, no files have been added
+    if (base.currentSnapshot() == null) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
+    List<ManifestFile> deleteManifests = history.first();
+
+    long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
+    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter, caseSensitive);
+
+    ValidationException.check(deletes.isEmpty(),

Review comment:
       Thanks for adding this!

##########
File path: api/src/main/java/org/apache/iceberg/RowDelta.java
##########
@@ -111,4 +111,19 @@
    * @return this for method chaining
    */
   RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter);
+
+  /**
+   * Enables validation that delete files added concurrently do not conflict 
with this commit's operation.
+   * <p>
+   * This method must be called when the table is queried to produce a row 
delta for UPDATE and
+   * MERGE operations independently of the isolation level. Calling this 
method isn't required
+   * for DELETE operations as it is OK when a particular record we are trying 
to delete
+   * was deleted concurrently.
+   * <p>
+   * Validation applies to operations that happened after the snapshot passed 
to {@link #validateFromSnapshot(long)}.
+   *
+   * @param conflictDetectionFilter an expression on rows in the table
+   * @return this for method chaining
+   */
+  RowDelta validateNoConflictingDeleteFiles(Expression 
conflictDetectionFilter);

Review comment:
       Nit: should we make it a bit consistent with above?  (ie, omit 'files' 
from the name)

##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -94,9 +102,12 @@ protected void validate(TableMetadata base) {
         validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, 
!validateDeletes);
       }
 
-      // TODO: does this need to check new delete files?
-      if (conflictDetectionFilter != null) {
-        validateAddedDataFiles(base, startingSnapshotId, 
conflictDetectionFilter, caseSensitive);
+      if (appendConflictDetectionFilter != null) {
+        validateAddedDataFiles(base, startingSnapshotId, 
appendConflictDetectionFilter, caseSensitive);
+      }
+
+      if (deleteConflictDetectionFilter != null) {
+        validateNoNewDeletes(base, startingSnapshotId, 
deleteConflictDetectionFilter, caseSensitive);

Review comment:
       Do you mean it cannot resolve within same data file (I thought we are 
passing data filter)?  Or within the same partition?
   
   And also for my learning, you mean it will be over-aggressive and report 
false negatives even if rows do not actually conflict, until we make the 
optimization.

##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -86,6 +86,20 @@ public boolean isEmpty() {
     return (globalDeletes == null || globalDeletes.length == 0) && 
sortedDeletesByPartition.isEmpty();
   }
 
+  public List<DeleteFile> referencedDeleteFiles() {
+    List<DeleteFile> deleteFiles = Lists.newArrayList();

Review comment:
       Optional comment: small optimization can be done by knowing the initial 
length, and checking isEmpty
   ```
   if (isEmpty()) {
      return Lists.empty()
   else
      List<DeleteFile> deleteFiles = Lists.newArrayList(globalDeletes.length + 
sortedDeletesByPartition.length);
      ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to