This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new adc6c60d0c Spark 3.3: Count delete files in DeleteReachableFiles 
(#5451)
adc6c60d0c is described below

commit adc6c60d0c58e9b460e83fda606e07c667278dab
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Aug 5 18:32:08 2022 -0700

    Spark 3.3: Count delete files in DeleteReachableFiles (#5451)
---
 .palantir/revapi.yml                               |   6 ++
 .../iceberg/actions/DeleteReachableFiles.java      |   6 ++
 .../BaseDeleteReachableFilesActionResult.java      |  29 +++++
 .../iceberg/spark/actions/BaseSparkAction.java     | 119 ++++++++++++++++++++-
 .../actions/DeleteReachableFilesSparkAction.java   |  67 ++----------
 .../spark/actions/ExpireSnapshotsSparkAction.java  |  71 ++----------
 .../actions/TestDeleteReachableFilesAction.java    |  74 +++++++++++--
 7 files changed, 244 insertions(+), 128 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 9b2523db9e..4491003fc4 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -54,6 +54,12 @@ acceptedBreaks:
     - code: "java.method.addedToInterface"
       new: "method long 
org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()"
       justification: "Interface is backward compatible, very unlikely anyone 
implements this Result bean interface"
+    - code: "java.method.addedToInterface"
+      new: "method long 
org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedEqualityDeleteFilesCount()"
+      justification: "Interface is backward compatible, very unlikely anyone 
implements this Result bean interface"
+    - code: "java.method.addedToInterface"
+      new: "method long 
org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedPositionDeleteFilesCount()"
+      justification: "Interface is backward compatible, very unlikely anyone 
implements this Result bean interface"
     - code: "java.method.addedToInterface"
       new: "method org.apache.iceberg.ExpireSnapshots 
org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)"
       justification: "Accept all changes prior to introducing API 
compatibility checks"
diff --git 
a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
index e2ab755e2f..aa15ded714 100644
--- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java
@@ -67,6 +67,12 @@ public interface DeleteReachableFiles
     /** Returns the number of deleted data files. */
     long deletedDataFilesCount();
 
+    /** Returns the number of deleted equality delete files. */
+    long deletedEqualityDeleteFilesCount();
+
+    /** Returns the number of deleted position delete files. */
+    long deletedPositionDeleteFilesCount();
+
     /** Returns the number of deleted manifests. */
     long deletedManifestsCount();
 
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
 
b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
index 6adb825822..b47ec73107 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.actions;
 public class BaseDeleteReachableFilesActionResult implements 
DeleteReachableFiles.Result {
 
   private final long deletedDataFilesCount;
+  private final long deletedPosDeleteFilesCount;
+  private final long deletedEqDeleteFilesCount;
   private final long deletedManifestsCount;
   private final long deletedManifestListsCount;
   private final long deletedOtherFilesCount;
@@ -31,6 +33,23 @@ public class BaseDeleteReachableFilesActionResult implements 
DeleteReachableFile
       long deletedManifestListsCount,
       long otherDeletedFilesCount) {
     this.deletedDataFilesCount = deletedDataFilesCount;
+    this.deletedPosDeleteFilesCount = 0;
+    this.deletedEqDeleteFilesCount = 0;
+    this.deletedManifestsCount = deletedManifestsCount;
+    this.deletedManifestListsCount = deletedManifestListsCount;
+    this.deletedOtherFilesCount = otherDeletedFilesCount;
+  }
+
+  public BaseDeleteReachableFilesActionResult(
+      long deletedDataFilesCount,
+      long deletedPosDeleteFilesCount,
+      long deletedEqDeleteFilesCount,
+      long deletedManifestsCount,
+      long deletedManifestListsCount,
+      long otherDeletedFilesCount) {
+    this.deletedDataFilesCount = deletedDataFilesCount;
+    this.deletedPosDeleteFilesCount = deletedPosDeleteFilesCount;
+    this.deletedEqDeleteFilesCount = deletedEqDeleteFilesCount;
     this.deletedManifestsCount = deletedManifestsCount;
     this.deletedManifestListsCount = deletedManifestListsCount;
     this.deletedOtherFilesCount = otherDeletedFilesCount;
@@ -41,6 +60,16 @@ public class BaseDeleteReachableFilesActionResult implements 
DeleteReachableFile
     return deletedDataFilesCount;
   }
 
+  @Override
+  public long deletedPositionDeleteFilesCount() {
+    return deletedPosDeleteFilesCount;
+  }
+
+  @Override
+  public long deletedEqualityDeleteFilesCount() {
+    return deletedEqDeleteFilesCount;
+  }
+
   @Override
   public long deletedManifestsCount() {
     return deletedManifestsCount;
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index acfdeb3264..e9012a6d72 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,15 @@ import static org.apache.spark.sql.functions.lit;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
@@ -37,6 +41,8 @@ import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
@@ -47,6 +53,7 @@ import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.JobGroupUtils;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.iceberg.util.Tasks;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -55,11 +62,12 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 abstract class BaseSparkAction<ThisT> {
 
-  protected static final String CONTENT_FILE = "Content File";
   protected static final String MANIFEST = "Manifest";
   protected static final String MANIFEST_LIST = "Manifest List";
   protected static final String OTHERS = "Others";
@@ -68,7 +76,9 @@ abstract class BaseSparkAction<ThisT> {
   protected static final String FILE_TYPE = "file_type";
   protected static final String LAST_MODIFIED = "last_modified";
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+  private static final int DELETE_NUM_RETRIES = 3;
 
   private final SparkSession spark;
   private final JavaSparkContext sparkContext;
@@ -200,6 +210,113 @@ abstract class BaseSparkAction<ThisT> {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
+  /**
+   * Deletes files and keeps track of how many files were removed for each 
file type.
+   *
+   * @param executorService an executor service to use for parallel deletes
+   * @param deleteFunc a delete func
+   * @param files an iterator of Spark rows of the structure (path: String, 
type: String)
+   * @return stats on which files were deleted
+   */
+  protected DeleteSummary deleteFiles(
+      ExecutorService executorService, Consumer<String> deleteFunc, 
Iterator<Row> files) {
+
+    DeleteSummary summary = new DeleteSummary();
+
+    Tasks.foreach(files)
+        .retry(DELETE_NUM_RETRIES)
+        .stopRetryOn(NotFoundException.class)
+        .suppressFailureWhenFinished()
+        .executeWith(executorService)
+        .onFailure(
+            (fileInfo, exc) -> {
+              String path = fileInfo.getString(0);
+              String type = fileInfo.getString(1);
+              LOG.warn("Delete failed for {}: {}", type, path, exc);
+            })
+        .run(
+            fileInfo -> {
+              String path = fileInfo.getString(0);
+              String type = fileInfo.getString(1);
+              deleteFunc.accept(path);
+              summary.deletedFile(path, type);
+            });
+
+    return summary;
+  }
+
+  static class DeleteSummary {
+    private final AtomicLong dataFilesCount = new AtomicLong(0L);
+    private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
+    private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
+    private final AtomicLong manifestsCount = new AtomicLong(0L);
+    private final AtomicLong manifestListsCount = new AtomicLong(0L);
+    private final AtomicLong otherFilesCount = new AtomicLong(0L);
+
+    public void deletedFile(String path, String type) {
+      if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+        dataFilesCount.incrementAndGet();
+        LOG.trace("Deleted data file: {}", path);
+
+      } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+        positionDeleteFilesCount.incrementAndGet();
+        LOG.trace("Deleted positional delete file: {}", path);
+
+      } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+        equalityDeleteFilesCount.incrementAndGet();
+        LOG.trace("Deleted equality delete file: {}", path);
+
+      } else if (MANIFEST.equalsIgnoreCase(type)) {
+        manifestsCount.incrementAndGet();
+        LOG.debug("Deleted manifest: {}", path);
+
+      } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
+        manifestListsCount.incrementAndGet();
+        LOG.debug("Deleted manifest list: {}", path);
+
+      } else if (OTHERS.equalsIgnoreCase(type)) {
+        otherFilesCount.incrementAndGet();
+        LOG.debug("Deleted other metadata file: {}", path);
+
+      } else {
+        throw new ValidationException("Illegal file type: %s", type);
+      }
+    }
+
+    public long dataFilesCount() {
+      return dataFilesCount.get();
+    }
+
+    public long positionDeleteFilesCount() {
+      return positionDeleteFilesCount.get();
+    }
+
+    public long equalityDeleteFilesCount() {
+      return equalityDeleteFilesCount.get();
+    }
+
+    public long manifestsCount() {
+      return manifestsCount.get();
+    }
+
+    public long manifestListsCount() {
+      return manifestListsCount.get();
+    }
+
+    public long otherFilesCount() {
+      return otherFilesCount.get();
+    }
+
+    public long totalFilesCount() {
+      return dataFilesCount()
+          + positionDeleteFilesCount()
+          + equalityDeleteFilesCount()
+          + manifestsCount()
+          + manifestListsCount()
+          + otherFilesCount();
+    }
+  }
+
   private static class ReadManifest
       implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
     private final Broadcast<Table> table;
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index a9828d2c78..41127f7c73 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -23,21 +23,18 @@ import static 
org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult;
 import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -126,64 +123,22 @@ public class DeleteReachableFilesSparkAction
 
   private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, io);
-    return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
+    return buildValidContentFileWithTypeDF(staticTable)
         .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
         .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
         .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), 
OTHERS));
   }
 
-  /**
-   * Deletes files passed to it.
-   *
-   * @param deleted an Iterator of Spark Rows of the structure (path: String, 
type: String)
-   * @return Statistics on which files were deleted
-   */
-  private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> 
deleted) {
-    AtomicLong dataFileCount = new AtomicLong(0L);
-    AtomicLong manifestCount = new AtomicLong(0L);
-    AtomicLong manifestListCount = new AtomicLong(0L);
-    AtomicLong otherFilesCount = new AtomicLong(0L);
-
-    Tasks.foreach(deleted)
-        .retry(3)
-        .stopRetryOn(NotFoundException.class)
-        .suppressFailureWhenFinished()
-        .executeWith(deleteExecutorService)
-        .onFailure(
-            (fileInfo, exc) -> {
-              String file = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
-              LOG.warn("Delete failed for {}: {}", type, file, exc);
-            })
-        .run(
-            fileInfo -> {
-              String file = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
-              deleteFunc.accept(file);
-              switch (type) {
-                case CONTENT_FILE:
-                  dataFileCount.incrementAndGet();
-                  LOG.trace("Deleted Content File: {}", file);
-                  break;
-                case MANIFEST:
-                  manifestCount.incrementAndGet();
-                  LOG.debug("Deleted Manifest: {}", file);
-                  break;
-                case MANIFEST_LIST:
-                  manifestListCount.incrementAndGet();
-                  LOG.debug("Deleted Manifest List: {}", file);
-                  break;
-                case OTHERS:
-                  otherFilesCount.incrementAndGet();
-                  LOG.debug("Others: {}", file);
-                  break;
-              }
-            });
-
-    long filesCount =
-        dataFileCount.get() + manifestCount.get() + manifestListCount.get() + 
otherFilesCount.get();
-    LOG.info("Total files removed: {}", filesCount);
+  private DeleteReachableFiles.Result deleteFiles(Iterator<Row> files) {
+    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    LOG.info("Deleted {} total files", summary.totalFilesCount());
+
     return new BaseDeleteReachableFilesActionResult(
-        dataFileCount.get(), manifestCount.get(), manifestListCount.get(), 
otherFilesCount.get());
+        summary.dataFilesCount(),
+        summary.positionDeleteFilesCount(),
+        summary.equalityDeleteFilesCount(),
+        summary.manifestsCount(),
+        summary.manifestListsCount(),
+        summary.otherFilesCount());
   }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index c9e5f7cca7..13402d40d7 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -25,16 +25,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import org.apache.iceberg.FileContent;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
 import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -42,7 +39,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -229,66 +225,15 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
         .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
   }
 
-  /**
-   * Deletes files passed to it based on their type.
-   *
-   * @param expired an Iterator of Spark Rows of the structure (path: String, 
type: String)
-   * @return Statistics on which files were deleted
-   */
-  private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
-    AtomicLong dataFileCount = new AtomicLong(0L);
-    AtomicLong posDeleteFileCount = new AtomicLong(0L);
-    AtomicLong eqDeleteFileCount = new AtomicLong(0L);
-    AtomicLong manifestCount = new AtomicLong(0L);
-    AtomicLong manifestListCount = new AtomicLong(0L);
-
-    Tasks.foreach(expired)
-        .retry(3)
-        .stopRetryOn(NotFoundException.class)
-        .suppressFailureWhenFinished()
-        .executeWith(deleteExecutorService)
-        .onFailure(
-            (fileInfo, exc) -> {
-              String file = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
-              LOG.warn("Delete failed for {}: {}", type, file, exc);
-            })
-        .run(
-            fileInfo -> {
-              String file = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
-              deleteFunc.accept(file);
-
-              if (FileContent.DATA.name().equalsIgnoreCase(type)) {
-                dataFileCount.incrementAndGet();
-                LOG.trace("Deleted Data File: {}", file);
-              } else if 
(FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
-                posDeleteFileCount.incrementAndGet();
-                LOG.trace("Deleted Positional Delete File: {}", file);
-              } else if 
(FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
-                eqDeleteFileCount.incrementAndGet();
-                LOG.trace("Deleted Equality Delete File: {}", file);
-              } else if (MANIFEST.equals(type)) {
-                manifestCount.incrementAndGet();
-                LOG.debug("Deleted Manifest: {}", file);
-              } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
-                manifestListCount.incrementAndGet();
-                LOG.debug("Deleted Manifest List: {}", file);
-              } else {
-                throw new ValidationException("Illegal file type: %s", type);
-              }
-            });
-
-    long contentFileCount =
-        dataFileCount.get() + posDeleteFileCount.get() + 
eqDeleteFileCount.get();
-    LOG.info(
-        "Deleted {} total files", contentFileCount + manifestCount.get() + 
manifestListCount.get());
+  private ExpireSnapshots.Result deleteFiles(Iterator<Row> files) {
+    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    LOG.info("Deleted {} total files", summary.totalFilesCount());
 
     return new BaseExpireSnapshotsActionResult(
-        dataFileCount.get(),
-        posDeleteFileCount.get(),
-        eqDeleteFileCount.get(),
-        manifestCount.get(),
-        manifestListCount.get());
+        summary.dataFilesCount(),
+        summary.positionDeleteFilesCount(),
+        summary.equalityDeleteFilesCount(),
+        summary.manifestsCount(),
+        summary.manifestListsCount());
   }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
index 9090da2fe6..154e940519 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -94,6 +96,22 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
           .withPartition(TestHelpers.Row.of(3))
           .withRecordCount(1)
           .build();
+  static final DeleteFile FILE_A_POS_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-a-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartition(TestHelpers.Row.of(0))
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_A_EQ_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-a-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartition(TestHelpers.Row.of(0))
+          .withRecordCount(1)
+          .build();
 
   @Rule public TemporaryFolder temp = new TemporaryFolder();
 
@@ -109,6 +127,8 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
 
   private void checkRemoveFilesResults(
       long expectedDatafiles,
+      long expectedPosDeleteFiles,
+      long expectedEqDeleteFiles,
       long expectedManifestsDeleted,
       long expectedManifestListsDeleted,
       long expectedOtherFilesDeleted,
@@ -121,6 +141,14 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
         "Incorrect number of datafiles deleted",
         expectedDatafiles,
         results.deletedDataFilesCount());
+    Assert.assertEquals(
+        "Incorrect number of position delete files deleted",
+        expectedPosDeleteFiles,
+        results.deletedPositionDeleteFilesCount());
+    Assert.assertEquals(
+        "Incorrect number of equality delete files deleted",
+        expectedEqDeleteFiles,
+        results.deletedEqualityDeleteFilesCount());
     Assert.assertEquals(
         "Incorrect number of manifest lists deleted",
         expectedManifestListsDeleted,
@@ -177,7 +205,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
             file ->
                 Assert.assertTrue(
                     "FILE_A should be deleted", 
deletedFiles.contains(FILE_A.path().toString())));
-    checkRemoveFilesResults(4L, 6L, 4L, 6, result);
+    checkRemoveFilesResults(4L, 0, 0, 6L, 4L, 6, result);
   }
 
   @Test
@@ -195,7 +223,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
     DeleteReachableFiles.Result result =
         
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io()).execute();
 
-    checkRemoveFilesResults(3L, 3L, 3L, 5, result);
+    checkRemoveFilesResults(3L, 0, 0, 3L, 3L, 5, result);
   }
 
   @Test
@@ -203,7 +231,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
     DeleteReachableFiles.Result result =
         
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io()).execute();
 
-    checkRemoveFilesResults(0, 0, 0, 2, result);
+    checkRemoveFilesResults(0, 0, 0, 0, 0, 2, result);
   }
 
   @Test
@@ -223,7 +251,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
         
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
     DeleteReachableFiles.Result result = baseRemoveFilesSparkAction.execute();
 
-    checkRemoveFilesResults(4, 5, 5, 8, result);
+    checkRemoveFilesResults(4, 0, 0, 5, 5, 8, result);
   }
 
   @Test
@@ -234,7 +262,37 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
 
     DeleteReachableFiles baseRemoveFilesSparkAction =
         
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
-    checkRemoveFilesResults(2, 2, 2, 4, baseRemoveFilesSparkAction.execute());
+    checkRemoveFilesResults(2, 0, 0, 2, 2, 4, 
baseRemoveFilesSparkAction.execute());
+  }
+
+  @Test
+  public void testPositionDeleteFiles() {
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newRowDelta().addDeletes(FILE_A_POS_DELETES).commit();
+
+    DeleteReachableFiles baseRemoveFilesSparkAction =
+        
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
+    checkRemoveFilesResults(2, 1, 0, 3, 3, 6, 
baseRemoveFilesSparkAction.execute());
+  }
+
+  @Test
+  public void testEqualityDeleteFiles() {
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newRowDelta().addDeletes(FILE_A_EQ_DELETES).commit();
+
+    DeleteReachableFiles baseRemoveFilesSparkAction =
+        
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
+    checkRemoveFilesResults(2, 0, 1, 3, 3, 6, 
baseRemoveFilesSparkAction.execute());
   }
 
   @Test
@@ -247,7 +305,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
     // IO defaults to HadoopFileIO
     DeleteReachableFiles baseRemoveFilesSparkAction =
         sparkActions().deleteReachableFiles(metadataLocation(table));
-    checkRemoveFilesResults(2, 2, 2, 4, baseRemoveFilesSparkAction.execute());
+    checkRemoveFilesResults(2, 0, 0, 2, 2, 4, 
baseRemoveFilesSparkAction.execute());
   }
 
   @Test
@@ -273,7 +331,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
           int jobsAfter = 
spark.sparkContext().dagScheduler().nextJobId().get();
           int totalJobsRun = jobsAfter - jobsBefore;
 
-          checkRemoveFilesResults(3L, 4L, 3L, 5, results);
+          checkRemoveFilesResults(3L, 0, 0, 4L, 3L, 5, results);
 
           Assert.assertEquals(
               "Expected total jobs to be equal to total number of shuffle 
partitions",
@@ -301,7 +359,7 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
         
sparkActions().deleteReachableFiles(metadataLocation(table)).io(table.io());
     DeleteReachableFiles.Result res = baseRemoveFilesSparkAction.execute();
 
-    checkRemoveFilesResults(1, 1, 1, 4, res);
+    checkRemoveFilesResults(1, 0, 0, 1, 1, 4, res);
   }
 
   @Test

Reply via email to