This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f460964e78 Core: Use 'delete' / 'append' if OverwriteFiles only 
deletes/appends data files (#10150)
f460964e78 is described below

commit f460964e78dbbfbe81fd093d436dca80c72e7124
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Apr 25 11:21:28 2024 +0200

    Core: Use 'delete' / 'append' if OverwriteFiles only deletes/appends data 
files (#10150)
---
 .../org/apache/iceberg/BaseOverwriteFiles.java     |  8 ++++
 .../java/org/apache/iceberg/TestOverwrite.java     | 46 +++++++++++++++++++++-
 .../SparkRowLevelOperationsTestBase.java           |  3 +-
 .../iceberg/spark/extensions/TestDelete.java       |  5 ++-
 .../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
 .../SparkRowLevelOperationsTestBase.java           |  3 +-
 .../iceberg/spark/extensions/TestDelete.java       |  5 ++-
 .../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
 .../SparkRowLevelOperationsTestBase.java           |  3 +-
 .../iceberg/spark/extensions/TestDelete.java       |  5 ++-
 .../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
 11 files changed, 109 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java 
b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
index a994eaf44d..d929bc068e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
@@ -48,6 +48,14 @@ public class BaseOverwriteFiles extends 
MergingSnapshotProducer<OverwriteFiles>
 
   @Override
   protected String operation() {
+    if (deletesDataFiles() && !addsDataFiles()) {
+      return DataOperations.DELETE;
+    }
+
+    if (addsDataFiles() && !deletesDataFiles()) {
+      return DataOperations.APPEND;
+    }
+
     return DataOperations.OVERWRITE;
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java 
b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
index 15b5547cc4..d4c886cefc 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
@@ -101,8 +101,8 @@ public class TestOverwrite extends TestBase {
                   ImmutableMap.of(1, 5L, 2, 3L), // value count
                   ImmutableMap.of(1, 0L, 2, 2L), // null count
                   null,
-                  ImmutableMap.of(1, longToBuffer(5L)), // lower bounds
-                  ImmutableMap.of(1, longToBuffer(9L)) // upper bounds
+                  ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+                  ImmutableMap.of(1, longToBuffer(14L)) // upper bounds
                   ))
           .build();
 
@@ -135,6 +135,43 @@ public class TestOverwrite extends TestBase {
     commit(table, 
table.newAppend().appendFile(FILE_0_TO_4).appendFile(FILE_5_TO_9), branch);
   }
 
+  @TestTemplate
+  public void deleteDataFilesProducesDeleteOperation() {
+    commit(table, table.newOverwrite().deleteFile(FILE_A).deleteFile(FILE_B), 
branch);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.DELETE);
+  }
+
+  @TestTemplate
+  public void addAndDeleteDataFilesProducesOverwriteOperation() {
+    commit(table, 
table.newOverwrite().addFile(FILE_10_TO_14).deleteFile(FILE_B), branch);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
+  }
+
+  @TestTemplate
+  public void overwriteByRowFilterProducesDeleteOperation() {
+    commit(table, table.newOverwrite().overwriteByRowFilter(equal("date", 
"2018-06-08")), branch);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.DELETE);
+  }
+
+  @TestTemplate
+  public void addAndOverwriteByRowFilterProducesOverwriteOperation() {
+    commit(
+        table,
+        table
+            .newOverwrite()
+            .addFile(FILE_10_TO_14)
+            .overwriteByRowFilter(equal("date", "2018-06-08")),
+        branch);
+
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
+  }
+
+  @TestTemplate
+  public void addFilesProducesAppendOperation() {
+    commit(table, 
table.newOverwrite().addFile(FILE_10_TO_14).addFile(FILE_5_TO_9), branch);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.APPEND);
+  }
+
   @TestTemplate
   public void testOverwriteWithoutAppend() {
     TableMetadata base = TestTables.readMetadata(TABLE_NAME);
@@ -145,6 +182,7 @@ public class TestOverwrite extends TestBase {
     long overwriteId = latestSnapshot(table, branch).snapshotId();
 
     assertThat(overwriteId).isNotEqualTo(baseId);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.DELETE);
     assertThat(latestSnapshot(table, 
branch).allManifests(table.io())).hasSize(1);
 
     validateManifestEntries(
@@ -188,6 +226,7 @@ public class TestOverwrite extends TestBase {
 
     long overwriteId = latestSnapshot(table, branch).snapshotId();
 
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
     assertThat(overwriteId).isNotEqualTo(baseId);
     assertThat(latestSnapshot(table, 
branch).allManifests(table.io())).hasSize(2);
 
@@ -224,6 +263,7 @@ public class TestOverwrite extends TestBase {
 
     long overwriteId = latestSnapshot(table, branch).snapshotId();
 
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
     assertThat(overwriteId).isNotEqualTo(baseId);
     assertThat(latestSnapshot(table, 
branch).allManifests(table.io())).hasSize(1);
 
@@ -255,6 +295,7 @@ public class TestOverwrite extends TestBase {
         .hasMessageStartingWith("Cannot append file with rows that do not 
match filter");
 
     assertThat(latestSnapshot(table, branch).snapshotId()).isEqualTo(baseId);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.APPEND);
   }
 
   @TestTemplate
@@ -275,6 +316,7 @@ public class TestOverwrite extends TestBase {
         .hasMessageStartingWith("Cannot append file with rows that do not 
match filter");
 
     assertThat(latestSnapshot(base, branch).snapshotId()).isEqualTo(baseId);
+    assertThat(latestSnapshot(table, 
branch).operation()).isEqualTo(DataOperations.APPEND);
   }
 
   @TestTemplate
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index 7398e38300..6aae084346 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -247,8 +247,9 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
       String changedPartitionCount,
       String deletedDataFiles,
       String addedDataFiles) {
+    String operation = null == addedDataFiles && null != deletedDataFiles ? 
DELETE : OVERWRITE;
     validateSnapshot(
-        snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
+        snapshot, operation, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
   }
 
   protected void validateMergeOnRead(
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index cdc508f985..e5ea378e5a 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.DataOperations.DELETE;
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
 import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
 import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
@@ -424,8 +425,10 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     Table table = validationCatalog.loadTable(tableIdent);
     Assert.assertEquals("Should have 4 snapshots", 4, 
Iterables.size(table.snapshots()));
 
-    // should be an overwrite since cannot be executed using a metadata 
operation
+    // should be a "delete" instead of an "overwrite" as only data files have 
been removed (COW) /
+    // delete files have been added (MOR)
     Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
     if (mode(table) == COPY_ON_WRITE) {
       validateCopyOnWrite(currentSnapshot, "1", "1", null);
     } else {
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c706603d06..de94a7c8bf 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -571,8 +571,20 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     List<List<SimpleRecord>> dataAcrossSnapshots = 
TEST_DATA_MULTIPLE_SNAPSHOTS;
     appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
+    DataFile dataFile =
+        DataFiles.builder(table.spec())
+            .withPath(temp.newFile().toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withFormat(FileFormat.PARQUET)
+            .build();
+
     // this should create a snapshot with type overwrite.
-    table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 
4)).commit();
+    table
+        .newOverwrite()
+        .addFile(dataFile)
+        .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+        .commit();
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // OVERWRITE.
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index 463cf2a47d..5a1cc63434 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -283,8 +283,9 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
       String changedPartitionCount,
       String deletedDataFiles,
       String addedDataFiles) {
+    String operation = null == addedDataFiles && null != deletedDataFiles ? 
DELETE : OVERWRITE;
     validateSnapshot(
-        snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
+        snapshot, operation, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
   }
 
   protected void validateMergeOnRead(
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 731dedbd48..e6114d4abc 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.DataOperations.DELETE;
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
 import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
 import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
@@ -592,8 +593,10 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     Table table = validationCatalog.loadTable(tableIdent);
     Assert.assertEquals("Should have 4 snapshots", 4, 
Iterables.size(table.snapshots()));
 
-    // should be an overwrite since cannot be executed using a metadata 
operation
+    // should be a "delete" instead of an "overwrite" as only data files have 
been removed (COW) /
+    // delete files have been added (MOR)
     Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
     if (mode(table) == COPY_ON_WRITE) {
       validateCopyOnWrite(currentSnapshot, "1", "1", null);
     } else {
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c1bbf304fa..47e9333601 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -576,8 +576,20 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     List<List<SimpleRecord>> dataAcrossSnapshots = 
TEST_DATA_MULTIPLE_SNAPSHOTS;
     appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
+    DataFile dataFile =
+        DataFiles.builder(table.spec())
+            .withPath(temp.newFile().toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withFormat(FileFormat.PARQUET)
+            .build();
+
     // this should create a snapshot with type overwrite.
-    table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 
4)).commit();
+    table
+        .newOverwrite()
+        .addFile(dataFile)
+        .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+        .commit();
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // OVERWRITE.
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index a926388e4a..f00b942190 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -276,8 +276,9 @@ public abstract class SparkRowLevelOperationsTestBase 
extends ExtensionsTestBase
       String changedPartitionCount,
       String deletedDataFiles,
       String addedDataFiles) {
+    String operation = null == addedDataFiles && null != deletedDataFiles ? 
DELETE : OVERWRITE;
     validateSnapshot(
-        snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
+        snapshot, operation, changedPartitionCount, deletedDataFiles, null, 
addedDataFiles);
   }
 
   protected void validateMergeOnRead(
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 05887d2a8b..b88df19ea9 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.DataOperations.DELETE;
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
 import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
 import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
@@ -590,8 +591,10 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     Table table = validationCatalog.loadTable(tableIdent);
     assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4);
 
-    // should be an overwrite since cannot be executed using a metadata 
operation
+    // should be a "delete" instead of an "overwrite" as only data files have 
been removed (COW) /
+    // delete files have been added (MOR)
     Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
     if (mode(table) == COPY_ON_WRITE) {
       validateCopyOnWrite(currentSnapshot, "1", "1", null);
     } else {
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index a5bcf53bd7..5a8d4deec7 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -656,8 +656,20 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     List<List<SimpleRecord>> dataAcrossSnapshots = 
TEST_DATA_MULTIPLE_SNAPSHOTS;
     appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
+    DataFile dataFile =
+        DataFiles.builder(table.spec())
+            .withPath(File.createTempFile("junit", null, 
temp.toFile()).getPath())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .withFormat(FileFormat.PARQUET)
+            .build();
+
     // this should create a snapshot with type overwrite.
-    table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 
4)).commit();
+    table
+        .newOverwrite()
+        .addFile(dataFile)
+        .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+        .commit();
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // OVERWRITE.

Reply via email to