This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f826cf461a Core: Add a validation API to DeleteFiles which validates
files exist (#8525)
f826cf461a is described below
commit f826cf461ab299bee4b3783c1f41679e5558fd3b
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Sep 21 14:03:06 2023 -0700
Core: Add a validation API to DeleteFiles which validates files exist
(#8525)
prior to attempting to deletion.
Simplify/improve the validation check
Use failMissingDeletePaths, more simplification
---
.../main/java/org/apache/iceberg/DeleteFiles.java | 11 ++++++++
.../java/org/apache/iceberg/StreamingDelete.java | 15 +++++++++++
.../java/org/apache/iceberg/TestDeleteFiles.java | 31 ++++++++++++++++++++++
3 files changed, 57 insertions(+)
diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
index 74d31a6dad..8a396920e0 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java
@@ -81,4 +81,15 @@ public interface DeleteFiles extends
SnapshotUpdate<DeleteFiles> {
* @return this for method chaining
*/
DeleteFiles caseSensitive(boolean caseSensitive);
+
+ /**
+ * Enables validation that any files that are part of the deletion still
exist when committing the
+ * operation.
+ *
+ * @return this for method chaining
+ */
+ default DeleteFiles validateFilesExist() {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement validateFilesExist");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
index 8ff7bb831e..df5a11bf31 100644
--- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java
+++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.expressions.Expression;
* CommitFailedException}.
*/
public class StreamingDelete extends MergingSnapshotProducer<DeleteFiles>
implements DeleteFiles {
+ private boolean validateFilesToDeleteExist = false;
+
protected StreamingDelete(String tableName, TableOperations ops) {
super(tableName, ops);
}
@@ -60,9 +62,22 @@ public class StreamingDelete extends
MergingSnapshotProducer<DeleteFiles> implem
return this;
}
+ @Override
+ public DeleteFiles validateFilesExist() {
+ this.validateFilesToDeleteExist = true;
+ return this;
+ }
+
@Override
public StreamingDelete toBranch(String branch) {
targetBranch(branch);
return this;
}
+
+ @Override
+ protected void validate(TableMetadata base, Snapshot parent) {
+ if (validateFilesToDeleteExist) {
+ failMissingDeletePaths();
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 4e4565306c..63fc7010c4 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -412,6 +412,37 @@ public class TestDeleteFiles extends TableTestBase {
afterDeletePartitions);
}
+ @Test
+ public void testDeleteValidateFileExistence() {
+ commit(table, table.newFastAppend().appendFile(FILE_B), branch);
+ Snapshot delete =
+ commit(table,
table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch);
+ validateManifestEntries(
+ Iterables.getOnlyElement(delete.allManifests(FILE_IO)),
+ ids(delete.snapshotId()),
+ files(FILE_B),
+ statuses(Status.DELETED));
+
+ Assertions.assertThatThrownBy(
+ () -> commit(table,
table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ public void testDeleteFilesNoValidation() {
+ commit(table, table.newFastAppend().appendFile(FILE_B), branch);
+ Snapshot delete1 = commit(table, table.newDelete().deleteFile(FILE_B),
branch);
+ validateManifestEntries(
+ Iterables.getOnlyElement(delete1.allManifests(FILE_IO)),
+ ids(delete1.snapshotId()),
+ files(FILE_B),
+ statuses(Status.DELETED));
+
+ Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B),
branch);
+ Assertions.assertThat(delete2.allManifests(FILE_IO).isEmpty()).isTrue();
+
Assertions.assertThat(delete2.removedDataFiles(FILE_IO).iterator().hasNext()).isFalse();
+ }
+
private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0,
value);
}