This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 51782d3747 Core, Spark 3.4: Add filter to Rewrite position deletes 
(#7582)
51782d3747 is described below

commit 51782d37476cea05294e0e31aef0d97c2e258578
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Aug 7 13:13:08 2023 -0700

    Core, Spark 3.4: Add filter to Rewrite position deletes (#7582)
---
 .../org/apache/iceberg/PositionDeletesTable.java   |  77 ++++++--
 .../org/apache/iceberg/TestMetadataTableScans.java | 205 ++++++++++++++++++++-
 .../RewritePositionDeleteFilesSparkAction.java     |   9 +-
 .../TestRewritePositionDeleteFilesAction.java      | 161 +++++++++++++++-
 4 files changed, 427 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java 
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 39b43cc413..f8cb924e53 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,8 +24,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
@@ -130,18 +132,28 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
   public static class PositionDeletesBatchScan
       extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
 
+    private Expression baseTableFilter = Expressions.alwaysTrue();
+
     protected PositionDeletesBatchScan(Table table, Schema schema) {
       super(table, schema, TableScanContext.empty());
     }
 
+    /** @deprecated the API will be removed in v1.5.0 */
+    @Deprecated
     protected PositionDeletesBatchScan(Table table, Schema schema, 
TableScanContext context) {
       super(table, schema, context);
     }
 
+    protected PositionDeletesBatchScan(
+        Table table, Schema schema, TableScanContext context, Expression 
baseTableFilter) {
+      super(table, schema, context);
+      this.baseTableFilter = baseTableFilter;
+    }
+
     @Override
     protected PositionDeletesBatchScan newRefinedScan(
         Table newTable, Schema newSchema, TableScanContext newContext) {
-      return new PositionDeletesBatchScan(newTable, newSchema, newContext);
+      return new PositionDeletesBatchScan(newTable, newSchema, newContext, 
baseTableFilter);
     }
 
     @Override
@@ -155,6 +167,32 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
       return context().returnColumnStats() ? DELETE_SCAN_WITH_STATS_COLUMNS : 
DELETE_SCAN_COLUMNS;
     }
 
+    /**
+     * Sets a filter that applies on base table of this position deletes 
table, to use for this
+     * scan.
+     *
+     * <p>Only the partition expressions part of the filter will be applied to 
the position deletes
+     * table, as the schema of the base table does not otherwise match the 
schema of position
+     * deletes table.
+     *
+     * <ul>
+     *   <li>Only the partition expressions of the filter that can be 
projected on the base table
+     *       partition specs, via {@link
+     *       
org.apache.iceberg.expressions.Projections.ProjectionEvaluator#project(Expression)}
+     *       will be evaluated. Note, not all partition expressions can be 
projected.
+     *   <li>Because it cannot apply beyond the partition expression, this 
filter will not
+     *       contribute to the residuals of tasks returned by this scan. (See 
{@link
+     *       PositionDeletesScanTask#residual()})
+     * </ul>
+     *
+     * @param expr expression filter that applies on the base table of this 
posiiton deletes table
+     * @return this for method chaining
+     */
+    public BatchScan baseTableFilter(Expression expr) {
+      return new PositionDeletesBatchScan(
+          table(), schema(), context(), Expressions.and(baseTableFilter, 
expr));
+    }
+
     @Override
     protected CloseableIterable<ScanTask> doPlanFiles() {
       String schemaString = SchemaParser.toJson(tableSchema());
@@ -162,23 +200,27 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
       // prepare transformed partition specs and caches
       Map<Integer, PartitionSpec> transformedSpecs = 
transformSpecs(tableSchema(), table().specs());
 
+      LoadingCache<Integer, String> specStringCache =
+          partitionCacheOf(transformedSpecs, PartitionSpecParser::toJson);
+      LoadingCache<Integer, ManifestEvaluator> deletesTableEvalCache =
+          partitionCacheOf(
+              transformedSpecs,
+              spec -> ManifestEvaluator.forRowFilter(filter(), spec, 
isCaseSensitive()));
+      LoadingCache<Integer, ManifestEvaluator> baseTableEvalCache =
+          partitionCacheOf(
+              table().specs(), // evaluate base table filters on base table 
specs
+              spec -> ManifestEvaluator.forRowFilter(baseTableFilter, spec, 
isCaseSensitive()));
       LoadingCache<Integer, ResidualEvaluator> residualCache =
           partitionCacheOf(
               transformedSpecs,
               spec ->
                   ResidualEvaluator.of(
                       spec,
+                      // there are no applicable filters in the base table's 
filter
+                      // that we can use to evaluate on the position deletes 
table
                       shouldIgnoreResiduals() ? Expressions.alwaysTrue() : 
filter(),
                       isCaseSensitive()));
 
-      LoadingCache<Integer, String> specStringCache =
-          partitionCacheOf(transformedSpecs, PartitionSpecParser::toJson);
-
-      LoadingCache<Integer, ManifestEvaluator> evalCache =
-          partitionCacheOf(
-              transformedSpecs,
-              spec -> ManifestEvaluator.forRowFilter(filter(), spec, 
isCaseSensitive()));
-
       // iterate through delete manifests
       List<ManifestFile> manifests = snapshot().deleteManifests(table().io());
 
@@ -186,8 +228,9 @@ public class PositionDeletesTable extends BaseMetadataTable 
{
           CloseableIterable.filter(
               scanMetrics().skippedDeleteManifests(),
               CloseableIterable.withNoopClose(manifests),
-              manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
-
+              manifest ->
+                  
baseTableEvalCache.get(manifest.partitionSpecId()).eval(manifest)
+                      && 
deletesTableEvalCache.get(manifest.partitionSpecId()).eval(manifest));
       matchingManifests =
           CloseableIterable.count(scanMetrics().scannedDeleteManifests(), 
matchingManifests);
 
@@ -196,7 +239,12 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
               matchingManifests,
               manifest ->
                   posDeletesScanTasks(
-                      manifest, schemaString, transformedSpecs, residualCache, 
specStringCache));
+                      manifest,
+                      table().specs().get(manifest.partitionSpecId()),
+                      schemaString,
+                      transformedSpecs,
+                      residualCache,
+                      specStringCache));
 
       if (planExecutor() != null) {
         return new ParallelIterable<>(tasks, planExecutor());
@@ -207,6 +255,7 @@ public class PositionDeletesTable extends BaseMetadataTable 
{
 
     private CloseableIterable<ScanTask> posDeletesScanTasks(
         ManifestFile manifest,
+        PartitionSpec spec,
         String schemaString,
         Map<Integer, PartitionSpec> transformedSpecs,
         LoadingCache<Integer, ResidualEvaluator> residualCache,
@@ -223,12 +272,16 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
 
         @Override
         public CloseableIterator<ScanTask> iterator() {
+          Expression partitionFilter =
+              Projections.inclusive(spec, 
isCaseSensitive()).project(baseTableFilter);
+
           // Filter partitions
           CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries =
               ManifestFiles.readDeleteManifest(manifest, table().io(), 
transformedSpecs)
                   .caseSensitive(isCaseSensitive())
                   .select(scanColumns())
                   .filterRows(filter())
+                  .filterPartitions(partitionFilter)
                   .scanMetrics(scanMetrics())
                   .liveEntries();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 2e34f2e6da..9da1838ad2 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -53,22 +53,50 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     super(formatVersion);
   }
 
-  private void preparePartitionedTable() {
-    preparePartitionedTableData();
+  private void preparePartitionedTable(boolean transactional) {
+    preparePartitionedTableData(transactional);
 
     if (formatVersion == 2) {
-      table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
-      table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
-      table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
-      table.newRowDelta().addDeletes(FILE_D2_DELETES).commit();
+      if (transactional) {
+        table
+            .newRowDelta()
+            .addDeletes(FILE_A_DELETES)
+            .addDeletes(FILE_B_DELETES)
+            .addDeletes(FILE_C2_DELETES)
+            .addDeletes(FILE_D2_DELETES)
+            .commit();
+      } else {
+        table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+        table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+        table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+        table.newRowDelta().addDeletes(FILE_D2_DELETES).commit();
+      }
+    }
+  }
+
+  private void preparePartitionedTable() {
+    preparePartitionedTable(false);
+  }
+
+  private void preparePartitionedTableData(boolean transactional) {
+    if (transactional) {
+      table
+          .newFastAppend()
+          .appendFile(FILE_A)
+          .appendFile(FILE_B)
+          .appendFile(FILE_C)
+          .appendFile(FILE_D)
+          .commit();
+    } else {
+      table.newFastAppend().appendFile(FILE_A).commit();
+      table.newFastAppend().appendFile(FILE_C).commit();
+      table.newFastAppend().appendFile(FILE_D).commit();
+      table.newFastAppend().appendFile(FILE_B).commit();
     }
   }
 
   private void preparePartitionedTableData() {
-    table.newFastAppend().appendFile(FILE_A).commit();
-    table.newFastAppend().appendFile(FILE_C).commit();
-    table.newFastAppend().appendFile(FILE_D).commit();
-    table.newFastAppend().appendFile(FILE_B).commit();
+    preparePartitionedTableData(false);
   }
 
   @Test
@@ -1261,6 +1289,163 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
         constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
   }
 
+  @Test
+  public void testPositionDeletesBaseTableFilterManifestLevel() {
+    testPositionDeletesBaseTableFilter(false);
+  }
+
+  @Test
+  public void testPositionDeletesBaseTableFilterEntriesLevel() {
+    testPositionDeletesBaseTableFilter(true);
+  }
+
+  private void testPositionDeletesBaseTableFilter(boolean transactional) {
+    Assume.assumeTrue("Position deletes supported only for v2 tables", 
formatVersion == 2);
+    preparePartitionedTable(transactional);
+
+    PositionDeletesTable positionDeletesTable = new 
PositionDeletesTable(table);
+
+    Expression expression =
+        Expressions.and(
+            Expressions.equal("data", "u"), // hashes to bucket 0
+            Expressions.greaterThan("id", 0));
+    BatchScan scan =
+        ((PositionDeletesTable.PositionDeletesBatchScan) 
positionDeletesTable.newBatchScan())
+            .baseTableFilter(expression);
+    
assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+    PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+        (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+    List<ScanTask> tasks = Lists.newArrayList(scan.planFiles());
+
+    Assert.assertEquals(
+        "Expected to scan one delete manifest",
+        1,
+        deleteScan.scanMetrics().scannedDeleteManifests().value());
+    int expectedSkippedManifests = transactional ? 0 : 3;
+    Assert.assertEquals(
+        "Wrong number of manifests skipped",
+        expectedSkippedManifests,
+        deleteScan.scanMetrics().skippedDeleteManifests().value());
+
+    assertThat(tasks).hasSize(1);
+
+    ScanTask task = tasks.get(0);
+    assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+
+    Types.StructType partitionType = Partitioning.partitionType(table);
+    PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
+
+    // base table filter should only be used to evaluate partitions
+    posDeleteTask.residual().isEquivalentTo(Expressions.alwaysTrue());
+
+    int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
+    StructLike taskPartitionStruct =
+        (StructLike)
+            constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
+    int taskPartition = taskPartitionStruct.get(0, Integer.class);
+    Assert.assertEquals("Expected correct partition on task's file", 0, 
filePartition);
+    Assert.assertEquals("Expected correct partition on task's column", 0, 
taskPartition);
+
+    Assert.assertEquals(
+        "Expected correct partition spec id on task", 0, 
posDeleteTask.file().specId());
+    Assert.assertEquals(
+        "Expected correct partition spec id on constant column",
+        0,
+        constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.SPEC_ID.fieldId()));
+
+    Assert.assertEquals(
+        "Expected correct delete file on task", FILE_A_DELETES.path(), 
posDeleteTask.file().path());
+    Assert.assertEquals(
+        "Expected correct delete file on constant column",
+        FILE_A_DELETES.path(),
+        constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
+  }
+
+  @Test
+  public void testPositionDeletesWithBaseTableFilterNot() {
+    Assume.assumeTrue("Position deletes supported only for v2 tables", 
formatVersion == 2);
+
+    // use identity rather than bucket partition spec,
+    // as bucket.project does not support projecting notEq
+    table.updateSpec().removeField("data_bucket").addField("id").commit();
+    PartitionSpec spec = table.spec();
+
+    String path0 = "/path/to/data-0-deletes.parquet";
+    PartitionData partitionData0 = new PartitionData(spec.partitionType());
+    partitionData0.set(0, 0);
+    DeleteFile deleteFileA =
+        FileMetadata.deleteFileBuilder(spec)
+            .ofPositionDeletes()
+            .withPath(path0)
+            .withFileSizeInBytes(10)
+            .withPartition(partitionData0)
+            .withRecordCount(1)
+            .build();
+
+    String path1 = "/path/to/data-1-deletes.parquet";
+    PartitionData partitionData1 = new PartitionData(spec.partitionType());
+    partitionData1.set(0, 1);
+    DeleteFile deleteFileB =
+        FileMetadata.deleteFileBuilder(spec)
+            .ofPositionDeletes()
+            .withPath(path1)
+            .withFileSizeInBytes(10)
+            .withPartition(partitionData1)
+            .withRecordCount(1)
+            .build();
+    
table.newRowDelta().addDeletes(deleteFileA).addDeletes(deleteFileB).commit();
+
+    PositionDeletesTable positionDeletesTable = new 
PositionDeletesTable(table);
+
+    Expression expression = Expressions.not(Expressions.equal("id", 0));
+    BatchScan scan =
+        ((PositionDeletesTable.PositionDeletesBatchScan) 
positionDeletesTable.newBatchScan())
+            .baseTableFilter(expression);
+    
assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+    PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+        (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+    List<ScanTask> tasks = Lists.newArrayList(scan.planFiles());
+
+    Assert.assertEquals(
+        "Expected to scan one delete manifest",
+        1,
+        deleteScan.scanMetrics().scannedDeleteManifests().value());
+    assertThat(tasks).hasSize(1);
+
+    ScanTask task = tasks.get(0);
+    assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+
+    Types.StructType partitionType = Partitioning.partitionType(table);
+    PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
+
+    // base table filter should only be used to evaluate partitions
+    posDeleteTask.residual().isEquivalentTo(Expressions.alwaysTrue());
+
+    int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
+    StructLike taskPartitionStruct =
+        (StructLike)
+            constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
+    int taskPartition =
+        taskPartitionStruct.get(1, Integer.class); // new partition field in 
position 1
+    Assert.assertEquals("Expected correct partition on task's file", 1, 
filePartition);
+    Assert.assertEquals("Expected correct partition on task's column", 1, 
taskPartition);
+
+    Assert.assertEquals(
+        "Expected correct partition spec id on task", 1, 
posDeleteTask.file().specId());
+    Assert.assertEquals(
+        "Expected correct partition spec id on constant column",
+        1,
+        constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.SPEC_ID.fieldId()));
+
+    Assert.assertEquals("Expected correct delete file on task", path1, 
posDeleteTask.file().path());
+    Assert.assertEquals(
+        "Expected correct delete file on constant column",
+        path1,
+        constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
+  }
+
   @Test
   public void testPositionDeletesResiduals() {
     Assume.assumeTrue("Position deletes supported only for v2 tables", 
formatVersion == 2);
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index bd8610ea73..f3dfd2dcc3 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan;
 import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@ import org.apache.iceberg.actions.RewritePositionDeletesGroup;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -84,6 +86,7 @@ public class RewritePositionDeleteFilesSparkAction
 
   private final Table table;
   private final SparkBinPackPositionDeletesRewriter rewriter;
+  private Expression filter = Expressions.alwaysTrue();
 
   private int maxConcurrentFileGroupRewrites;
   private int maxCommits;
@@ -103,7 +106,8 @@ public class RewritePositionDeleteFilesSparkAction
 
   @Override
   public RewritePositionDeleteFilesSparkAction filter(Expression expression) {
-    throw new UnsupportedOperationException("Regular filters not supported 
yet.");
+    filter = Expressions.and(filter, expression);
+    return this;
   }
 
   @Override
@@ -153,8 +157,9 @@ public class RewritePositionDeleteFilesSparkAction
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
 
+    PositionDeletesBatchScan scan = (PositionDeletesBatchScan) 
deletesTable.newBatchScan();
     return CloseableIterable.transform(
-        deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+        scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
         task -> (PositionDeletesScanTask) task);
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 9267fae85f..59c5d44bda 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.PositionDeletesScanTask;
@@ -202,6 +204,51 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
     assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
   }
 
+  @Test
+  public void testRewriteFilter() throws Exception {
+    Table table = createTablePartitioned(4, 2, SCALE);
+    table.refresh();
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+    Assert.assertEquals(4, dataFiles.size());
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    Assert.assertEquals(8, deleteFiles.size());
+
+    table.refresh();
+    List<Object[]> expectedRecords = records(table);
+    List<Object[]> expectedDeletes = deleteRecords(table);
+    Assert.assertEquals(12000, expectedRecords.size()); // 16000 data - 4000 
delete rows
+    Assert.assertEquals(4000, expectedDeletes.size());
+
+    Expression filter =
+        Expressions.and(
+            Expressions.greaterThan("c3", "0"), // should have no effect
+            Expressions.or(Expressions.equal("c1", 1), Expressions.equal("c1", 
2)));
+
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .filter(filter)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
+            .execute();
+
+    List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
+    Assert.assertEquals("Should have 4 delete files", 2, 
newDeleteFiles.size());
+
+    List<DeleteFile> expectedRewrittenFiles =
+        filterFiles(table, deleteFiles, ImmutableList.of(1), 
ImmutableList.of(2));
+    assertLocallySorted(newDeleteFiles);
+    checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
+
+    List<Object[]> actualRecords = records(table);
+    List<Object[]> actualDeletes = deleteRecords(table);
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
+  }
+
   @Test
   public void testRewriteToSmallerTarget() throws Exception {
     Table table = createTablePartitioned(4, 2, SCALE);
@@ -337,6 +384,54 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
     assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
   }
 
+  @Test
+  public void testRewriteFilterRemoveDangling() throws Exception {
+    Table table = createTablePartitioned(4, 2, SCALE);
+    table.refresh();
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles, true);
+    Assert.assertEquals(4, dataFiles.size());
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    Assert.assertEquals(8, deleteFiles.size());
+
+    table.refresh();
+    List<Object[]> expectedRecords = records(table);
+    List<Object[]> expectedDeletes = deleteRecords(table);
+    Assert.assertEquals(12000, expectedRecords.size()); // 16000 data - 4000 
delete rows
+    Assert.assertEquals(4000, expectedDeletes.size());
+
+    SparkActions.get(spark)
+        .rewriteDataFiles(table)
+        .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+        .execute();
+
+    Expression filter = Expressions.or(Expressions.equal("c1", 0), 
Expressions.equal("c1", 1));
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .filter(filter)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
+            .execute();
+
+    List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
+    Assert.assertEquals("Should have 2 new delete files", 0, 
newDeleteFiles.size());
+
+    List<DeleteFile> expectedRewrittenFiles =
+        filterFiles(table, deleteFiles, ImmutableList.of(0), 
ImmutableList.of(1));
+    checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
+
+    List<Object[]> actualRecords = records(table);
+    List<Object[]> allDeletes = deleteRecords(table);
+    // Only non-compacted deletes remain
+    List<Object[]> expectedDeletesFiltered =
+        filterDeletes(expectedDeletes, ImmutableList.of(2), 
ImmutableList.of(3));
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertEquals("Position deletes must match", expectedDeletesFiltered, 
allDeletes);
+  }
+
   @Test
   public void testPartitionEvolutionAdd() throws Exception {
     Table table = createTableUnpartitioned(2, SCALE);
@@ -423,7 +518,6 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
             .rewritePositionDeletes(table)
             .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
             .execute();
-
     List<DeleteFile> newDeleteFiles = deleteFiles(table);
     Assert.assertEquals("Should have 3 new delete files", 3, 
newDeleteFiles.size());
     assertNotContains(expectedRewritten, newDeleteFiles);
@@ -737,6 +831,71 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
     return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
   }
 
+  private List<Object[]> filterDeletes(List<Object[]> deletes, List<?>... 
partitionValues) {
+    Stream<Object[]> matches =
+        deletes.stream()
+            .filter(
+                r -> {
+                  Object[] partition = (Object[]) r[3];
+                  return Arrays.stream(partitionValues)
+                      .map(partitionValue -> match(partition, partitionValue))
+                      .reduce((a, b) -> a || b)
+                      .get();
+                });
+    return sorted(matches).collect(Collectors.toList());
+  }
+
+  private boolean match(Object[] partition, List<?> expectedPartition) {
+    return IntStream.range(0, expectedPartition.size())
+        .mapToObj(j -> partition[j] == expectedPartition.get(j))
+        .reduce((a, b) -> a && b)
+        .get();
+  }
+
+  private Stream<Object[]> sorted(Stream<Object[]> deletes) {
+    return deletes.sorted(
+        (a, b) -> {
+          String aFilePath = (String) a[0];
+          String bFilePath = (String) b[0];
+          int filePathCompare = aFilePath.compareTo(bFilePath);
+          if (filePathCompare != 0) {
+            return filePathCompare;
+          } else {
+            long aPos = (long) a[1];
+            long bPos = (long) b[1];
+            return Long.compare(aPos, bPos);
+          }
+        });
+  }
+
+  private List<DeleteFile> filterFiles(
+      Table table, List<DeleteFile> files, List<?>... partitionValues) {
+    List<Types.StructType> partitionTypes =
+        table.specs().values().stream()
+            .map(PartitionSpec::partitionType)
+            .collect(Collectors.toList());
+    List<PartitionData> partitionDatas =
+        Arrays.stream(partitionValues)
+            .map(
+                partitionValue -> {
+                  Types.StructType thisType =
+                      partitionTypes.stream()
+                          .filter(f -> f.fields().size() == 
partitionValue.size())
+                          .findFirst()
+                          .get();
+                  PartitionData partition = new PartitionData(thisType);
+                  for (int i = 0; i < partitionValue.size(); i++) {
+                    partition.set(i, partitionValue.get(i));
+                  }
+                  return partition;
+                })
+            .collect(Collectors.toList());
+
+    return files.stream()
+        .filter(f -> partitionDatas.stream().anyMatch(data -> 
f.partition().equals(data)))
+        .collect(Collectors.toList());
+  }
+
   private void checkResult(
       Result result,
       List<DeleteFile> rewrittenDeletes,

Reply via email to