This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f460964e78 Core: Use 'delete' / 'append' if OverwriteFiles only
deletes/appends data files (#10150)
f460964e78 is described below
commit f460964e78dbbfbe81fd093d436dca80c72e7124
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Apr 25 11:21:28 2024 +0200
Core: Use 'delete' / 'append' if OverwriteFiles only deletes/appends data
files (#10150)
---
.../org/apache/iceberg/BaseOverwriteFiles.java | 8 ++++
.../java/org/apache/iceberg/TestOverwrite.java | 46 +++++++++++++++++++++-
.../SparkRowLevelOperationsTestBase.java | 3 +-
.../iceberg/spark/extensions/TestDelete.java | 5 ++-
.../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
.../SparkRowLevelOperationsTestBase.java | 3 +-
.../iceberg/spark/extensions/TestDelete.java | 5 ++-
.../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
.../SparkRowLevelOperationsTestBase.java | 3 +-
.../iceberg/spark/extensions/TestDelete.java | 5 ++-
.../spark/source/TestStructuredStreamingRead3.java | 14 ++++++-
11 files changed, 109 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
index a994eaf44d..d929bc068e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
@@ -48,6 +48,14 @@ public class BaseOverwriteFiles extends
MergingSnapshotProducer<OverwriteFiles>
@Override
protected String operation() {
+ if (deletesDataFiles() && !addsDataFiles()) {
+ return DataOperations.DELETE;
+ }
+
+ if (addsDataFiles() && !deletesDataFiles()) {
+ return DataOperations.APPEND;
+ }
+
return DataOperations.OVERWRITE;
}
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java
b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
index 15b5547cc4..d4c886cefc 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
@@ -101,8 +101,8 @@ public class TestOverwrite extends TestBase {
ImmutableMap.of(1, 5L, 2, 3L), // value count
ImmutableMap.of(1, 0L, 2, 2L), // null count
null,
- ImmutableMap.of(1, longToBuffer(5L)), // lower bounds
- ImmutableMap.of(1, longToBuffer(9L)) // upper bounds
+ ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+ ImmutableMap.of(1, longToBuffer(14L)) // upper bounds
))
.build();
@@ -135,6 +135,43 @@ public class TestOverwrite extends TestBase {
commit(table,
table.newAppend().appendFile(FILE_0_TO_4).appendFile(FILE_5_TO_9), branch);
}
+ @TestTemplate
+ public void deleteDataFilesProducesDeleteOperation() {
+ commit(table, table.newOverwrite().deleteFile(FILE_A).deleteFile(FILE_B),
branch);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.DELETE);
+ }
+
+ @TestTemplate
+ public void addAndDeleteDataFilesProducesOverwriteOperation() {
+ commit(table,
table.newOverwrite().addFile(FILE_10_TO_14).deleteFile(FILE_B), branch);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
+ }
+
+ @TestTemplate
+ public void overwriteByRowFilterProducesDeleteOperation() {
+ commit(table, table.newOverwrite().overwriteByRowFilter(equal("date",
"2018-06-08")), branch);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.DELETE);
+ }
+
+ @TestTemplate
+ public void addAndOverwriteByRowFilterProducesOverwriteOperation() {
+ commit(
+ table,
+ table
+ .newOverwrite()
+ .addFile(FILE_10_TO_14)
+ .overwriteByRowFilter(equal("date", "2018-06-08")),
+ branch);
+
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
+ }
+
+ @TestTemplate
+ public void addFilesProducesAppendOperation() {
+ commit(table,
table.newOverwrite().addFile(FILE_10_TO_14).addFile(FILE_5_TO_9), branch);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.APPEND);
+ }
+
@TestTemplate
public void testOverwriteWithoutAppend() {
TableMetadata base = TestTables.readMetadata(TABLE_NAME);
@@ -145,6 +182,7 @@ public class TestOverwrite extends TestBase {
long overwriteId = latestSnapshot(table, branch).snapshotId();
assertThat(overwriteId).isNotEqualTo(baseId);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.DELETE);
assertThat(latestSnapshot(table,
branch).allManifests(table.io())).hasSize(1);
validateManifestEntries(
@@ -188,6 +226,7 @@ public class TestOverwrite extends TestBase {
long overwriteId = latestSnapshot(table, branch).snapshotId();
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
assertThat(overwriteId).isNotEqualTo(baseId);
assertThat(latestSnapshot(table,
branch).allManifests(table.io())).hasSize(2);
@@ -224,6 +263,7 @@ public class TestOverwrite extends TestBase {
long overwriteId = latestSnapshot(table, branch).snapshotId();
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.OVERWRITE);
assertThat(overwriteId).isNotEqualTo(baseId);
assertThat(latestSnapshot(table,
branch).allManifests(table.io())).hasSize(1);
@@ -255,6 +295,7 @@ public class TestOverwrite extends TestBase {
.hasMessageStartingWith("Cannot append file with rows that do not
match filter");
assertThat(latestSnapshot(table, branch).snapshotId()).isEqualTo(baseId);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.APPEND);
}
@TestTemplate
@@ -275,6 +316,7 @@ public class TestOverwrite extends TestBase {
.hasMessageStartingWith("Cannot append file with rows that do not
match filter");
assertThat(latestSnapshot(base, branch).snapshotId()).isEqualTo(baseId);
+ assertThat(latestSnapshot(table,
branch).operation()).isEqualTo(DataOperations.APPEND);
}
@TestTemplate
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index 7398e38300..6aae084346 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -247,8 +247,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != deletedDataFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
}
protected void validateMergeOnRead(
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index cdc508f985..e5ea378e5a 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
@@ -424,8 +425,10 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4,
Iterables.size(table.snapshots()));
- // should be an overwrite since cannot be executed using a metadata
operation
+ // should be a "delete" instead of an "overwrite" as only data files have
been removed (COW) /
+ // delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c706603d06..de94a7c8bf 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -571,8 +571,20 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
List<List<SimpleRecord>> dataAcrossSnapshots =
TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(temp.newFile().toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
// this should create a snapshot with type overwrite.
- table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id",
4)).commit();
+ table
+ .newOverwrite()
+ .addFile(dataFile)
+ .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+ .commit();
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// OVERWRITE.
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index 463cf2a47d..5a1cc63434 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -283,8 +283,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends SparkExtensionsTes
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != deletedDataFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
}
protected void validateMergeOnRead(
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 731dedbd48..e6114d4abc 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
@@ -592,8 +593,10 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4,
Iterables.size(table.snapshots()));
- // should be an overwrite since cannot be executed using a metadata
operation
+ // should be a "delete" instead of an "overwrite" as only data files have
been removed (COW) /
+ // delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c1bbf304fa..47e9333601 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -576,8 +576,20 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
List<List<SimpleRecord>> dataAcrossSnapshots =
TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(temp.newFile().toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
// this should create a snapshot with type overwrite.
- table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id",
4)).commit();
+ table
+ .newOverwrite()
+ .addFile(dataFile)
+ .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+ .commit();
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// OVERWRITE.
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index a926388e4a..f00b942190 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -276,8 +276,9 @@ public abstract class SparkRowLevelOperationsTestBase
extends ExtensionsTestBase
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
+ String operation = null == addedDataFiles && null != deletedDataFiles ?
DELETE : OVERWRITE;
validateSnapshot(
- snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
+ snapshot, operation, changedPartitionCount, deletedDataFiles, null,
addedDataFiles);
}
protected void validateMergeOnRead(
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 05887d2a8b..b88df19ea9 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
@@ -590,8 +591,10 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4);
- // should be an overwrite since cannot be executed using a metadata
operation
+ // should be a "delete" instead of an "overwrite" as only data files have
been removed (COW) /
+ // delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index a5bcf53bd7..5a8d4deec7 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -656,8 +656,20 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
List<List<SimpleRecord>> dataAcrossSnapshots =
TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(File.createTempFile("junit", null,
temp.toFile()).getPath())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+
// this should create a snapshot with type overwrite.
- table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id",
4)).commit();
+ table
+ .newOverwrite()
+ .addFile(dataFile)
+ .overwriteByRowFilter(Expressions.greaterThan("id", 4))
+ .commit();
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// OVERWRITE.