This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f826cf461a Core: Add a validation API to DeleteFiles which validates 
files exist (#8525)
f826cf461a is described below

commit f826cf461ab299bee4b3783c1f41679e5558fd3b
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Sep 21 14:03:06 2023 -0700

    Core: Add a validation API to DeleteFiles which validates files exist 
(#8525)
    
    prior to attempting to deletion.
    
    Simplify/improve the validation check
    
    Use failMissingDeletePaths, more simplification
---
 .../main/java/org/apache/iceberg/DeleteFiles.java  | 11 ++++++++
 .../java/org/apache/iceberg/StreamingDelete.java   | 15 +++++++++++
 .../java/org/apache/iceberg/TestDeleteFiles.java   | 31 ++++++++++++++++++++++
 3 files changed, 57 insertions(+)

diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java 
b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
index 74d31a6dad..8a396920e0 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
@@ -81,4 +81,15 @@ public interface DeleteFiles extends 
SnapshotUpdate<DeleteFiles> {
    * @return this for method chaining
    */
   DeleteFiles caseSensitive(boolean caseSensitive);
+
+  /**
+   * Enables validation that any files that are part of the deletion still 
exist when committing the
+   * operation.
+   *
+   * @return this for method chaining
+   */
+  default DeleteFiles validateFilesExist() {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement validateFilesExist");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java 
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 8ff7bb831e..df5a11bf31 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.expressions.Expression;
  * CommitFailedException}.
  */
 public class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> 
implements DeleteFiles {
+  private boolean validateFilesToDeleteExist = false;
+
   protected StreamingDelete(String tableName, TableOperations ops) {
     super(tableName, ops);
   }
@@ -60,9 +62,22 @@ public class StreamingDelete extends 
MergingSnapshotProducer<DeleteFiles> implem
     return this;
   }
 
+  @Override
+  public DeleteFiles validateFilesExist() {
+    this.validateFilesToDeleteExist = true;
+    return this;
+  }
+
   @Override
   public StreamingDelete toBranch(String branch) {
     targetBranch(branch);
     return this;
   }
+
+  @Override
+  protected void validate(TableMetadata base, Snapshot parent) {
+    if (validateFilesToDeleteExist) {
+      failMissingDeletePaths();
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 4e4565306c..63fc7010c4 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -412,6 +412,37 @@ public class TestDeleteFiles extends TableTestBase {
         afterDeletePartitions);
   }
 
+  @Test
+  public void testDeleteValidateFileExistence() {
+    commit(table, table.newFastAppend().appendFile(FILE_B), branch);
+    Snapshot delete =
+        commit(table, 
table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch);
+    validateManifestEntries(
+        Iterables.getOnlyElement(delete.allManifests(FILE_IO)),
+        ids(delete.snapshotId()),
+        files(FILE_B),
+        statuses(Status.DELETED));
+
+    Assertions.assertThatThrownBy(
+            () -> commit(table, 
table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch))
+        .isInstanceOf(ValidationException.class);
+  }
+
+  @Test
+  public void testDeleteFilesNoValidation() {
+    commit(table, table.newFastAppend().appendFile(FILE_B), branch);
+    Snapshot delete1 = commit(table, table.newDelete().deleteFile(FILE_B), 
branch);
+    validateManifestEntries(
+        Iterables.getOnlyElement(delete1.allManifests(FILE_IO)),
+        ids(delete1.snapshotId()),
+        files(FILE_B),
+        statuses(Status.DELETED));
+
+    Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), 
branch);
+    Assertions.assertThat(delete2.allManifests(FILE_IO).isEmpty()).isTrue();
+    
Assertions.assertThat(delete2.removedDataFiles(FILE_IO).iterator().hasNext()).isFalse();
+  }
+
   private static ByteBuffer longToBuffer(long value) {
     return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, 
value);
   }

Reply via email to