This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new adc6c60d0c Spark 3.3: Count delete files in DeleteReachableFiles
(#5451)
adc6c60d0c is described below
commit adc6c60d0c58e9b460e83fda606e07c667278dab
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Aug 5 18:32:08 2022 -0700
Spark 3.3: Count delete files in DeleteReachableFiles (#5451)
---
.palantir/revapi.yml | 6 ++
.../iceberg/actions/DeleteReachableFiles.java | 6 ++
.../BaseDeleteReachableFilesActionResult.java | 29 +++++
.../iceberg/spark/actions/BaseSparkAction.java | 119 ++++++++++++++++++++-
.../actions/DeleteReachableFilesSparkAction.java | 67 ++----------
.../spark/actions/ExpireSnapshotsSparkAction.java | 71 ++----------
.../actions/TestDeleteReachableFilesAction.java | 74 +++++++++++--
7 files changed, 244 insertions(+), 128 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 9b2523db9e..4491003fc4 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -54,6 +54,12 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method long
org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()"
justification: "Interface is backward compatible, very unlikely anyone
implements this Result bean interface"
+ - code: "java.method.addedToInterface"
+ new: "method long
org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedEqualityDeleteFilesCount()"
+ justification: "Interface is backward compatible, very unlikely anyone
implements this Result bean interface"
+ - code: "java.method.addedToInterface"
+ new: "method long
org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedPositionDeleteFilesCount()"
+ justification: "Interface is backward compatible, very unlikely anyone
implements this Result bean interface"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.ExpireSnapshots
org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)"
justification: "Accept all changes prior to introducing API
compatibility checks"
diff --git
a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
index e2ab755e2f..aa15ded714 100644
--- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
@@ -67,6 +67,12 @@ public interface DeleteReachableFiles
/** Returns the number of deleted data files. */
long deletedDataFilesCount();
+ /** Returns the number of deleted equality delete files. */
+ long deletedEqualityDeleteFilesCount();
+
+ /** Returns the number of deleted position delete files. */
+ long deletedPositionDeleteFilesCount();
+
/** Returns the number of deleted manifests. */
long deletedManifestsCount();
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
index 6adb825822..b47ec73107 100644
---
a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
+++
b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.actions;
public class BaseDeleteReachableFilesActionResult implements
DeleteReachableFiles.Result {
private final long deletedDataFilesCount;
+ private final long deletedPosDeleteFilesCount;
+ private final long deletedEqDeleteFilesCount;
private final long deletedManifestsCount;
private final long deletedManifestListsCount;
private final long deletedOtherFilesCount;
@@ -31,6 +33,23 @@ public class BaseDeleteReachableFilesActionResult implements
DeleteReachableFile
long deletedManifestListsCount,
long otherDeletedFilesCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
+ this.deletedPosDeleteFilesCount = 0;
+ this.deletedEqDeleteFilesCount = 0;
+ this.deletedManifestsCount = deletedManifestsCount;
+ this.deletedManifestListsCount = deletedManifestListsCount;
+ this.deletedOtherFilesCount = otherDeletedFilesCount;
+ }
+
+ public BaseDeleteReachableFilesActionResult(
+ long deletedDataFilesCount,
+ long deletedPosDeleteFilesCount,
+ long deletedEqDeleteFilesCount,
+ long deletedManifestsCount,
+ long deletedManifestListsCount,
+ long otherDeletedFilesCount) {
+ this.deletedDataFilesCount = deletedDataFilesCount;
+ this.deletedPosDeleteFilesCount = deletedPosDeleteFilesCount;
+ this.deletedEqDeleteFilesCount = deletedEqDeleteFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
this.deletedOtherFilesCount = otherDeletedFilesCount;
@@ -41,6 +60,16 @@ public class BaseDeleteReachableFilesActionResult implements
DeleteReachableFile
return deletedDataFilesCount;
}
+ @Override
+ public long deletedPositionDeleteFilesCount() {
+ return deletedPosDeleteFilesCount;
+ }
+
+ @Override
+ public long deletedEqualityDeleteFilesCount() {
+ return deletedEqDeleteFilesCount;
+ }
+
@Override
public long deletedManifestsCount() {
return deletedManifestsCount;
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index acfdeb3264..e9012a6d72 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,15 @@ import static org.apache.spark.sql.functions.lit;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
@@ -37,6 +41,8 @@ import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
@@ -47,6 +53,7 @@ import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.iceberg.util.Tasks;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -55,11 +62,12 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
abstract class BaseSparkAction<ThisT> {
- protected static final String CONTENT_FILE = "Content File";
protected static final String MANIFEST = "Manifest";
protected static final String MANIFEST_LIST = "Manifest List";
protected static final String OTHERS = "Others";
@@ -68,7 +76,9 @@ abstract class BaseSparkAction<ThisT> {
protected static final String FILE_TYPE = "file_type";
protected static final String LAST_MODIFIED = "last_modified";
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+ private static final int DELETE_NUM_RETRIES = 3;
private final SparkSession spark;
private final JavaSparkContext sparkContext;
@@ -200,6 +210,113 @@ abstract class BaseSparkAction<ThisT> {
return SparkTableUtil.loadMetadataTable(spark, table, type);
}
+ /**
+ * Deletes files and keeps track of how many files were removed for each
file type.
+ *
+ * @param executorService an executor service to use for parallel deletes
+ * @param deleteFunc a delete func
+ * @param files an iterator of Spark rows of the structure (path: String,
type: String)
+ * @return stats on which files were deleted
+ */
+ protected DeleteSummary deleteFiles(
+ ExecutorService executorService, Consumer<String> deleteFunc,
Iterator<Row> files) {
+
+ DeleteSummary summary = new DeleteSummary();
+
+ Tasks.foreach(files)
+ .retry(DELETE_NUM_RETRIES)
+ .stopRetryOn(NotFoundException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (fileInfo, exc) -> {
+ String path = fileInfo.getString(0);
+ String type = fileInfo.getString(1);
+ LOG.warn("Delete failed for {}: {}", type, path, exc);
+ })
+ .run(
+ fileInfo -> {
+ String path = fileInfo.getString(0);
+ String type = fileInfo.getString(1);
+ deleteFunc.accept(path);
+ summary.deletedFile(path, type);
+ });
+
+ return summary;
+ }
+
+ static class DeleteSummary {
+ private final AtomicLong dataFilesCount = new AtomicLong(0L);
+ private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
+ private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
+ private final AtomicLong manifestsCount = new AtomicLong(0L);
+ private final AtomicLong manifestListsCount = new AtomicLong(0L);
+ private final AtomicLong otherFilesCount = new AtomicLong(0L);
+
+ public void deletedFile(String path, String type) {
+ if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+ dataFilesCount.incrementAndGet();
+ LOG.trace("Deleted data file: {}", path);
+
+ } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+ positionDeleteFilesCount.incrementAndGet();
+ LOG.trace("Deleted positional delete file: {}", path);
+
+ } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+ equalityDeleteFilesCount.incrementAndGet();
+ LOG.trace("Deleted equality delete file: {}", path);
+
+ } else if (MANIFEST.equalsIgnoreCase(type)) {
+ manifestsCount.incrementAndGet();
+ LOG.debug("Deleted manifest: {}", path);
+
+ } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
+ manifestListsCount.incrementAndGet();
+ LOG.debug("Deleted manifest list: {}", path);
+
+ } else if (OTHERS.equalsIgnoreCase(type)) {
+ otherFilesCount.incrementAndGet();
+ LOG.debug("Deleted other metadata file: {}", path);
+
+ } else {
+ throw new ValidationException("Illegal file type: %s", type);
+ }
+ }
+
+ public long dataFilesCount() {
+ return dataFilesCount.get();
+ }
+
+ public long positionDeleteFilesCount() {
+ return positionDeleteFilesCount.get();
+ }
+
+ public long equalityDeleteFilesCount() {
+ return equalityDeleteFilesCount.get();
+ }
+
+ public long manifestsCount() {
+ return manifestsCount.get();
+ }
+
+ public long manifestListsCount() {
+ return manifestListsCount.get();
+ }
+
+ public long otherFilesCount() {
+ return otherFilesCount.get();
+ }
+
+ public long totalFilesCount() {
+ return dataFilesCount()
+ + positionDeleteFilesCount()
+ + equalityDeleteFilesCount()
+ + manifestsCount()
+ + manifestListsCount()
+ + otherFilesCount();
+ }
+ }
+
private static class ReadManifest
implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
private final Broadcast<Table> table;
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index a9828d2c78..41127f7c73 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -23,21 +23,18 @@ import static
org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult;
import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -126,64 +123,22 @@ public class DeleteReachableFilesSparkAction
private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
- return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
+ return buildValidContentFileWithTypeDF(staticTable)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable),
OTHERS));
}
- /**
- * Deletes files passed to it.
- *
- * @param deleted an Iterator of Spark Rows of the structure (path: String,
type: String)
- * @return Statistics on which files were deleted
- */
- private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row>
deleted) {
- AtomicLong dataFileCount = new AtomicLong(0L);
- AtomicLong manifestCount = new AtomicLong(0L);
- AtomicLong manifestListCount = new AtomicLong(0L);
- AtomicLong otherFilesCount = new AtomicLong(0L);
-
- Tasks.foreach(deleted)
- .retry(3)
- .stopRetryOn(NotFoundException.class)
- .suppressFailureWhenFinished()
- .executeWith(deleteExecutorService)
- .onFailure(
- (fileInfo, exc) -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- LOG.warn("Delete failed for {}: {}", type, file, exc);
- })
- .run(
- fileInfo -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- deleteFunc.accept(file);
- switch (type) {
- case CONTENT_FILE:
- dataFileCount.incrementAndGet();
- LOG.trace("Deleted Content File: {}", file);
- break;
- case MANIFEST:
- manifestCount.incrementAndGet();
- LOG.debug("Deleted Manifest: {}", file);
- break;
- case MANIFEST_LIST:
- manifestListCount.incrementAndGet();
- LOG.debug("Deleted Manifest List: {}", file);
- break;
- case OTHERS:
- otherFilesCount.incrementAndGet();
- LOG.debug("Others: {}", file);
- break;
- }
- });
-
- long filesCount =
- dataFileCount.get() + manifestCount.get() + manifestListCount.get() +
otherFilesCount.get();
- LOG.info("Total files removed: {}", filesCount);
+ private DeleteReachableFiles.Result deleteFiles(Iterator<Row> files) {
+ DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc,
files);
+ LOG.info("Deleted {} total files", summary.totalFilesCount());
+
return new BaseDeleteReachableFilesActionResult(
- dataFileCount.get(), manifestCount.get(), manifestListCount.get(),
otherFilesCount.get());
+ summary.dataFilesCount(),
+ summary.positionDeleteFilesCount(),
+ summary.equalityDeleteFilesCount(),
+ summary.manifestsCount(),
+ summary.manifestListsCount(),
+ summary.otherFilesCount());
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index c9e5f7cca7..13402d40d7 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -25,16 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -42,7 +39,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -229,66 +225,15 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
}
- /**
- * Deletes files passed to it based on their type.
- *
- * @param expired an Iterator of Spark Rows of the structure (path: String,
type: String)
- * @return Statistics on which files were deleted
- */
- private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
- AtomicLong dataFileCount = new AtomicLong(0L);
- AtomicLong posDeleteFileCount = new AtomicLong(0L);
- AtomicLong eqDeleteFileCount = new AtomicLong(0L);
- AtomicLong manifestCount = new AtomicLong(0L);
- AtomicLong manifestListCount = new AtomicLong(0L);
-
- Tasks.foreach(expired)
- .retry(3)
- .stopRetryOn(NotFoundException.class)
- .suppressFailureWhenFinished()
- .executeWith(deleteExecutorService)
- .onFailure(
- (fileInfo, exc) -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- LOG.warn("Delete failed for {}: {}", type, file, exc);
- })
- .run(
- fileInfo -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- deleteFunc.accept(file);
-
- if (FileContent.DATA.name().equalsIgnoreCase(type)) {
- dataFileCount.incrementAndGet();
- LOG.trace("Deleted Data File: {}", file);
- } else if
(FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
- posDeleteFileCount.incrementAndGet();
- LOG.trace("Deleted Positional Delete File: {}", file);
- } else if
(FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
- eqDeleteFileCount.incrementAndGet();
- LOG.trace("Deleted Equality Delete File: {}", file);
- } else if (MANIFEST.equals(type)) {
- manifestCount.incrementAndGet();
- LOG.debug("Deleted Manifest: {}", file);
- } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
- manifestListCount.incrementAndGet();
- LOG.debug("Deleted Manifest List: {}", file);
- } else {
- throw new ValidationException("Illegal file type: %s", type);
- }
- });
-
- long contentFileCount =
- dataFileCount.get() + posDeleteFileCount.get() +
eqDeleteFileCount.get();
- LOG.info(
- "Deleted {} total files", contentFileCount + manifestCount.get() +
manifestListCount.get());
+ private ExpireSnapshots.Result deleteFiles(Iterator<Row> files) {
+ DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc,
files);
+ LOG.info("Deleted {} total files", summary.totalFilesCount());
return new BaseExpireSnapshotsActionResult(
- dataFileCount.get(),
- posDeleteFileCount.get(),
- eqDeleteFileCount.get(),
- manifestCount.get(),
- manifestListCount.get());
+ summary.dataFilesCount(),
+ summary.positionDeleteFilesCount(),
+ summary.equalityDeleteFilesCount(),
+ summary.manifestsCount(),
+ summary.manifestListsCount());
}
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
index 9090da2fe6..154e940519 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -94,6 +96,22 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
.withPartition(TestHelpers.Row.of(3))
.withRecordCount(1)
.build();
+ static final DeleteFile FILE_A_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(0))
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-a-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(0))
+ .withRecordCount(1)
+ .build();
@Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -109,6 +127,8 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
private void checkRemoveFilesResults(
long expectedDatafiles,
+ long expectedPosDeleteFiles,
+ long expectedEqDeleteFiles,
long expectedManifestsDeleted,
long expectedManifestListsDeleted,
long expectedOtherFilesDeleted,
@@ -121,6 +141,14 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
"Incorrect number of datafiles deleted",
expectedDatafiles,
results.deletedDataFilesCount());
+ Assert.assertEquals(
+ "Incorrect number of position delete files deleted",
+ expectedPosDeleteFiles,
+ results.deletedPositionDeleteFilesCount());
+ Assert.assertEquals(
+ "Incorrect number of equality delete files deleted",
+ expectedEqDeleteFiles,
+ results.deletedEqualityDeleteFilesCount());
Assert.assertEquals(
"Incorrect number of manifest lists deleted",
expectedManifestListsDeleted,
@@ -177,7 +205,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
file ->
Assert.assertTrue(
"FILE_A should be deleted",
deletedFiles.contains(FILE_A.path().toString())));
- checkRemoveFilesResults(4L, 6L, 4L, 6, result);
+ checkRemoveFilesResults(4L, 0, 0, 6L, 4L, 6, result);
}
@Test
@@ -195,7 +223,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
DeleteReachableFiles.Result result =
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io()).execute();
- checkRemoveFilesResults(3L, 3L, 3L, 5, result);
+ checkRemoveFilesResults(3L, 0, 0, 3L, 3L, 5, result);
}
@Test
@@ -203,7 +231,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
DeleteReachableFiles.Result result =
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io()).execute();
- checkRemoveFilesResults(0, 0, 0, 2, result);
+ checkRemoveFilesResults(0, 0, 0, 0, 0, 2, result);
}
@Test
@@ -223,7 +251,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
DeleteReachableFiles.Result result = baseRemoveFilesSparkAction.execute();
- checkRemoveFilesResults(4, 5, 5, 8, result);
+ checkRemoveFilesResults(4, 0, 0, 5, 5, 8, result);
}
@Test
@@ -234,7 +262,37 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
DeleteReachableFiles baseRemoveFilesSparkAction =
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
- checkRemoveFilesResults(2, 2, 2, 4, baseRemoveFilesSparkAction.execute());
+ checkRemoveFilesResults(2, 0, 0, 2, 2, 4,
baseRemoveFilesSparkAction.execute());
+ }
+
+ @Test
+ public void testPositionDeleteFiles() {
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newRowDelta().addDeletes(FILE_A_POS_DELETES).commit();
+
+ DeleteReachableFiles baseRemoveFilesSparkAction =
+
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
+ checkRemoveFilesResults(2, 1, 0, 3, 3, 6,
baseRemoveFilesSparkAction.execute());
+ }
+
+ @Test
+ public void testEqualityDeleteFiles() {
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newRowDelta().addDeletes(FILE_A_EQ_DELETES).commit();
+
+ DeleteReachableFiles baseRemoveFilesSparkAction =
+
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
+ checkRemoveFilesResults(2, 0, 1, 3, 3, 6,
baseRemoveFilesSparkAction.execute());
}
@Test
@@ -247,7 +305,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
// IO defaults to HadoopFileIO
DeleteReachableFiles baseRemoveFilesSparkAction =
sparkActions().deleteReachableFiles(metadataLocation(table));
- checkRemoveFilesResults(2, 2, 2, 4, baseRemoveFilesSparkAction.execute());
+ checkRemoveFilesResults(2, 0, 0, 2, 2, 4,
baseRemoveFilesSparkAction.execute());
}
@Test
@@ -273,7 +331,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
int jobsAfter =
spark.sparkContext().dagScheduler().nextJobId().get();
int totalJobsRun = jobsAfter - jobsBefore;
- checkRemoveFilesResults(3L, 4L, 3L, 5, results);
+ checkRemoveFilesResults(3L, 0, 0, 4L, 3L, 5, results);
Assert.assertEquals(
"Expected total jobs to be equal to total number of shuffle
partitions",
@@ -301,7 +359,7 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
DeleteReachableFiles.Result res = baseRemoveFilesSparkAction.execute();
- checkRemoveFilesResults(1, 1, 1, 4, res);
+ checkRemoveFilesResults(1, 0, 0, 1, 1, 4, res);
}
@Test