This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 51782d3747 Core, Spark 3.4: Add filter to Rewrite position deletes
(#7582)
51782d3747 is described below
commit 51782d37476cea05294e0e31aef0d97c2e258578
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Aug 7 13:13:08 2023 -0700
Core, Spark 3.4: Add filter to Rewrite position deletes (#7582)
---
.../org/apache/iceberg/PositionDeletesTable.java | 77 ++++++--
.../org/apache/iceberg/TestMetadataTableScans.java | 205 ++++++++++++++++++++-
.../RewritePositionDeleteFilesSparkAction.java | 9 +-
.../TestRewritePositionDeleteFilesAction.java | 161 +++++++++++++++-
4 files changed, 427 insertions(+), 25 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 39b43cc413..f8cb924e53 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,8 +24,10 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
@@ -130,18 +132,28 @@ public class PositionDeletesTable extends
BaseMetadataTable {
public static class PositionDeletesBatchScan
extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>>
implements BatchScan {
+ private Expression baseTableFilter = Expressions.alwaysTrue();
+
protected PositionDeletesBatchScan(Table table, Schema schema) {
super(table, schema, TableScanContext.empty());
}
+ /** @deprecated the API will be removed in v1.5.0 */
+ @Deprecated
protected PositionDeletesBatchScan(Table table, Schema schema,
TableScanContext context) {
super(table, schema, context);
}
+ protected PositionDeletesBatchScan(
+ Table table, Schema schema, TableScanContext context, Expression
baseTableFilter) {
+ super(table, schema, context);
+ this.baseTableFilter = baseTableFilter;
+ }
+
@Override
protected PositionDeletesBatchScan newRefinedScan(
Table newTable, Schema newSchema, TableScanContext newContext) {
- return new PositionDeletesBatchScan(newTable, newSchema, newContext);
+ return new PositionDeletesBatchScan(newTable, newSchema, newContext,
baseTableFilter);
}
@Override
@@ -155,6 +167,32 @@ public class PositionDeletesTable extends
BaseMetadataTable {
return context().returnColumnStats() ? DELETE_SCAN_WITH_STATS_COLUMNS :
DELETE_SCAN_COLUMNS;
}
+ /**
+ * Sets a filter that applies on base table of this position deletes
table, to use for this
+ * scan.
+ *
+ * <p>Only the partition expressions part of the filter will be applied to
the position deletes
+ * table, as the schema of the base table does not otherwise match the
schema of position
+ * deletes table.
+ *
+ * <ul>
+ * <li>Only the partition expressions of the filter that can be
projected on the base table
+ * partition specs, via {@link
+ *
org.apache.iceberg.expressions.Projections.ProjectionEvaluator#project(Expression)}
+ * will be evaluated. Note, not all partition expressions can be
projected.
+ * <li>Because it cannot apply beyond the partition expression, this
filter will not
+ * contribute to the residuals of tasks returned by this scan. (See
{@link
+ * PositionDeletesScanTask#residual()})
+ * </ul>
+ *
+ * @param expr expression filter that applies on the base table of this
posiiton deletes table
+ * @return this for method chaining
+ */
+ public BatchScan baseTableFilter(Expression expr) {
+ return new PositionDeletesBatchScan(
+ table(), schema(), context(), Expressions.and(baseTableFilter,
expr));
+ }
+
@Override
protected CloseableIterable<ScanTask> doPlanFiles() {
String schemaString = SchemaParser.toJson(tableSchema());
@@ -162,23 +200,27 @@ public class PositionDeletesTable extends
BaseMetadataTable {
// prepare transformed partition specs and caches
Map<Integer, PartitionSpec> transformedSpecs =
transformSpecs(tableSchema(), table().specs());
+ LoadingCache<Integer, String> specStringCache =
+ partitionCacheOf(transformedSpecs, PartitionSpecParser::toJson);
+ LoadingCache<Integer, ManifestEvaluator> deletesTableEvalCache =
+ partitionCacheOf(
+ transformedSpecs,
+ spec -> ManifestEvaluator.forRowFilter(filter(), spec,
isCaseSensitive()));
+ LoadingCache<Integer, ManifestEvaluator> baseTableEvalCache =
+ partitionCacheOf(
+ table().specs(), // evaluate base table filters on base table
specs
+ spec -> ManifestEvaluator.forRowFilter(baseTableFilter, spec,
isCaseSensitive()));
LoadingCache<Integer, ResidualEvaluator> residualCache =
partitionCacheOf(
transformedSpecs,
spec ->
ResidualEvaluator.of(
spec,
+ // there are no applicable filters in the base table's
filter
+ // that we can use to evaluate on the position deletes
table
shouldIgnoreResiduals() ? Expressions.alwaysTrue() :
filter(),
isCaseSensitive()));
- LoadingCache<Integer, String> specStringCache =
- partitionCacheOf(transformedSpecs, PartitionSpecParser::toJson);
-
- LoadingCache<Integer, ManifestEvaluator> evalCache =
- partitionCacheOf(
- transformedSpecs,
- spec -> ManifestEvaluator.forRowFilter(filter(), spec,
isCaseSensitive()));
-
// iterate through delete manifests
List<ManifestFile> manifests = snapshot().deleteManifests(table().io());
@@ -186,8 +228,9 @@ public class PositionDeletesTable extends BaseMetadataTable
{
CloseableIterable.filter(
scanMetrics().skippedDeleteManifests(),
CloseableIterable.withNoopClose(manifests),
- manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
-
+ manifest ->
+
baseTableEvalCache.get(manifest.partitionSpecId()).eval(manifest)
+ &&
deletesTableEvalCache.get(manifest.partitionSpecId()).eval(manifest));
matchingManifests =
CloseableIterable.count(scanMetrics().scannedDeleteManifests(),
matchingManifests);
@@ -196,7 +239,12 @@ public class PositionDeletesTable extends
BaseMetadataTable {
matchingManifests,
manifest ->
posDeletesScanTasks(
- manifest, schemaString, transformedSpecs, residualCache,
specStringCache));
+ manifest,
+ table().specs().get(manifest.partitionSpecId()),
+ schemaString,
+ transformedSpecs,
+ residualCache,
+ specStringCache));
if (planExecutor() != null) {
return new ParallelIterable<>(tasks, planExecutor());
@@ -207,6 +255,7 @@ public class PositionDeletesTable extends BaseMetadataTable
{
private CloseableIterable<ScanTask> posDeletesScanTasks(
ManifestFile manifest,
+ PartitionSpec spec,
String schemaString,
Map<Integer, PartitionSpec> transformedSpecs,
LoadingCache<Integer, ResidualEvaluator> residualCache,
@@ -223,12 +272,16 @@ public class PositionDeletesTable extends
BaseMetadataTable {
@Override
public CloseableIterator<ScanTask> iterator() {
+ Expression partitionFilter =
+ Projections.inclusive(spec,
isCaseSensitive()).project(baseTableFilter);
+
// Filter partitions
CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries =
ManifestFiles.readDeleteManifest(manifest, table().io(),
transformedSpecs)
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterRows(filter())
+ .filterPartitions(partitionFilter)
.scanMetrics(scanMetrics())
.liveEntries();
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 2e34f2e6da..9da1838ad2 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -53,22 +53,50 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
super(formatVersion);
}
- private void preparePartitionedTable() {
- preparePartitionedTableData();
+ private void preparePartitionedTable(boolean transactional) {
+ preparePartitionedTableData(transactional);
if (formatVersion == 2) {
- table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
- table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
- table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
- table.newRowDelta().addDeletes(FILE_D2_DELETES).commit();
+ if (transactional) {
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .addDeletes(FILE_B_DELETES)
+ .addDeletes(FILE_C2_DELETES)
+ .addDeletes(FILE_D2_DELETES)
+ .commit();
+ } else {
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+ table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+ table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+ table.newRowDelta().addDeletes(FILE_D2_DELETES).commit();
+ }
+ }
+ }
+
+ private void preparePartitionedTable() {
+ preparePartitionedTable(false);
+ }
+
+ private void preparePartitionedTableData(boolean transactional) {
+ if (transactional) {
+ table
+ .newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+ } else {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.newFastAppend().appendFile(FILE_C).commit();
+ table.newFastAppend().appendFile(FILE_D).commit();
+ table.newFastAppend().appendFile(FILE_B).commit();
}
}
private void preparePartitionedTableData() {
- table.newFastAppend().appendFile(FILE_A).commit();
- table.newFastAppend().appendFile(FILE_C).commit();
- table.newFastAppend().appendFile(FILE_D).commit();
- table.newFastAppend().appendFile(FILE_B).commit();
+ preparePartitionedTableData(false);
}
@Test
@@ -1261,6 +1289,163 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
}
+ @Test
+ public void testPositionDeletesBaseTableFilterManifestLevel() {
+ testPositionDeletesBaseTableFilter(false);
+ }
+
+ @Test
+ public void testPositionDeletesBaseTableFilterEntriesLevel() {
+ testPositionDeletesBaseTableFilter(true);
+ }
+
+ private void testPositionDeletesBaseTableFilter(boolean transactional) {
+ Assume.assumeTrue("Position deletes supported only for v2 tables",
formatVersion == 2);
+ preparePartitionedTable(transactional);
+
+ PositionDeletesTable positionDeletesTable = new
PositionDeletesTable(table);
+
+ Expression expression =
+ Expressions.and(
+ Expressions.equal("data", "u"), // hashes to bucket 0
+ Expressions.greaterThan("id", 0));
+ BatchScan scan =
+ ((PositionDeletesTable.PositionDeletesBatchScan)
positionDeletesTable.newBatchScan())
+ .baseTableFilter(expression);
+
assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+ PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+ (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+ List<ScanTask> tasks = Lists.newArrayList(scan.planFiles());
+
+ Assert.assertEquals(
+ "Expected to scan one delete manifest",
+ 1,
+ deleteScan.scanMetrics().scannedDeleteManifests().value());
+ int expectedSkippedManifests = transactional ? 0 : 3;
+ Assert.assertEquals(
+ "Wrong number of manifests skipped",
+ expectedSkippedManifests,
+ deleteScan.scanMetrics().skippedDeleteManifests().value());
+
+ assertThat(tasks).hasSize(1);
+
+ ScanTask task = tasks.get(0);
+ assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
+
+ // base table filter should only be used to evaluate partitions
+ posDeleteTask.residual().isEquivalentTo(Expressions.alwaysTrue());
+
+ int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
+ StructLike taskPartitionStruct =
+ (StructLike)
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
+ int taskPartition = taskPartitionStruct.get(0, Integer.class);
+ Assert.assertEquals("Expected correct partition on task's file", 0,
filePartition);
+ Assert.assertEquals("Expected correct partition on task's column", 0,
taskPartition);
+
+ Assert.assertEquals(
+ "Expected correct partition spec id on task", 0,
posDeleteTask.file().specId());
+ Assert.assertEquals(
+ "Expected correct partition spec id on constant column",
+ 0,
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.SPEC_ID.fieldId()));
+
+ Assert.assertEquals(
+ "Expected correct delete file on task", FILE_A_DELETES.path(),
posDeleteTask.file().path());
+ Assert.assertEquals(
+ "Expected correct delete file on constant column",
+ FILE_A_DELETES.path(),
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
+ }
+
+ @Test
+ public void testPositionDeletesWithBaseTableFilterNot() {
+ Assume.assumeTrue("Position deletes supported only for v2 tables",
formatVersion == 2);
+
+ // use identity rather than bucket partition spec,
+ // as bucket.project does not support projecting notEq
+ table.updateSpec().removeField("data_bucket").addField("id").commit();
+ PartitionSpec spec = table.spec();
+
+ String path0 = "/path/to/data-0-deletes.parquet";
+ PartitionData partitionData0 = new PartitionData(spec.partitionType());
+ partitionData0.set(0, 0);
+ DeleteFile deleteFileA =
+ FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPath(path0)
+ .withFileSizeInBytes(10)
+ .withPartition(partitionData0)
+ .withRecordCount(1)
+ .build();
+
+ String path1 = "/path/to/data-1-deletes.parquet";
+ PartitionData partitionData1 = new PartitionData(spec.partitionType());
+ partitionData1.set(0, 1);
+ DeleteFile deleteFileB =
+ FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPath(path1)
+ .withFileSizeInBytes(10)
+ .withPartition(partitionData1)
+ .withRecordCount(1)
+ .build();
+
table.newRowDelta().addDeletes(deleteFileA).addDeletes(deleteFileB).commit();
+
+ PositionDeletesTable positionDeletesTable = new
PositionDeletesTable(table);
+
+ Expression expression = Expressions.not(Expressions.equal("id", 0));
+ BatchScan scan =
+ ((PositionDeletesTable.PositionDeletesBatchScan)
positionDeletesTable.newBatchScan())
+ .baseTableFilter(expression);
+
assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+ PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+ (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+ List<ScanTask> tasks = Lists.newArrayList(scan.planFiles());
+
+ Assert.assertEquals(
+ "Expected to scan one delete manifest",
+ 1,
+ deleteScan.scanMetrics().scannedDeleteManifests().value());
+ assertThat(tasks).hasSize(1);
+
+ ScanTask task = tasks.get(0);
+ assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
+
+ // base table filter should only be used to evaluate partitions
+ posDeleteTask.residual().isEquivalentTo(Expressions.alwaysTrue());
+
+ int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
+ StructLike taskPartitionStruct =
+ (StructLike)
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
+ int taskPartition =
+ taskPartitionStruct.get(1, Integer.class); // new partition field in
position 1
+ Assert.assertEquals("Expected correct partition on task's file", 1,
filePartition);
+ Assert.assertEquals("Expected correct partition on task's column", 1,
taskPartition);
+
+ Assert.assertEquals(
+ "Expected correct partition spec id on task", 1,
posDeleteTask.file().specId());
+ Assert.assertEquals(
+ "Expected correct partition spec id on constant column",
+ 1,
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.SPEC_ID.fieldId()));
+
+ Assert.assertEquals("Expected correct delete file on task", path1,
posDeleteTask.file().path());
+ Assert.assertEquals(
+ "Expected correct delete file on constant column",
+ path1,
+ constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.FILE_PATH.fieldId()));
+ }
+
@Test
public void testPositionDeletesResiduals() {
Assume.assumeTrue("Position deletes supported only for v2 tables",
formatVersion == 2);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index bd8610ea73..f3dfd2dcc3 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@ import org.apache.iceberg.actions.RewritePositionDeletesGroup;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -84,6 +86,7 @@ public class RewritePositionDeleteFilesSparkAction
private final Table table;
private final SparkBinPackPositionDeletesRewriter rewriter;
+ private Expression filter = Expressions.alwaysTrue();
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
@@ -103,7 +106,8 @@ public class RewritePositionDeleteFilesSparkAction
@Override
public RewritePositionDeleteFilesSparkAction filter(Expression expression) {
- throw new UnsupportedOperationException("Regular filters not supported
yet.");
+ filter = Expressions.and(filter, expression);
+ return this;
}
@Override
@@ -153,8 +157,9 @@ public class RewritePositionDeleteFilesSparkAction
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
+ PositionDeletesBatchScan scan = (PositionDeletesBatchScan)
deletesTable.newBatchScan();
return CloseableIterable.transform(
- deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+ scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
task -> (PositionDeletesScanTask) task);
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 9267fae85f..59c5d44bda 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +36,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
@@ -202,6 +204,51 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
}
+ @Test
+ public void testRewriteFilter() throws Exception {
+ Table table = createTablePartitioned(4, 2, SCALE);
+ table.refresh();
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+ Assert.assertEquals(4, dataFiles.size());
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ Assert.assertEquals(8, deleteFiles.size());
+
+ table.refresh();
+ List<Object[]> expectedRecords = records(table);
+ List<Object[]> expectedDeletes = deleteRecords(table);
+ Assert.assertEquals(12000, expectedRecords.size()); // 16000 data - 4000
delete rows
+ Assert.assertEquals(4000, expectedDeletes.size());
+
+ Expression filter =
+ Expressions.and(
+ Expressions.greaterThan("c3", "0"), // should have no effect
+ Expressions.or(Expressions.equal("c1", 1), Expressions.equal("c1",
2)));
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .filter(filter)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
+ .execute();
+
+ List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
+ Assert.assertEquals("Should have 4 delete files", 2,
newDeleteFiles.size());
+
+ List<DeleteFile> expectedRewrittenFiles =
+ filterFiles(table, deleteFiles, ImmutableList.of(1),
ImmutableList.of(2));
+ assertLocallySorted(newDeleteFiles);
+ checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
+
+ List<Object[]> actualRecords = records(table);
+ List<Object[]> actualDeletes = deleteRecords(table);
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
+ }
+
@Test
public void testRewriteToSmallerTarget() throws Exception {
Table table = createTablePartitioned(4, 2, SCALE);
@@ -337,6 +384,54 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
}
+ @Test
+ public void testRewriteFilterRemoveDangling() throws Exception {
+ Table table = createTablePartitioned(4, 2, SCALE);
+ table.refresh();
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles, true);
+ Assert.assertEquals(4, dataFiles.size());
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ Assert.assertEquals(8, deleteFiles.size());
+
+ table.refresh();
+ List<Object[]> expectedRecords = records(table);
+ List<Object[]> expectedDeletes = deleteRecords(table);
+ Assert.assertEquals(12000, expectedRecords.size()); // 16000 data - 4000
delete rows
+ Assert.assertEquals(4000, expectedDeletes.size());
+
+ SparkActions.get(spark)
+ .rewriteDataFiles(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .execute();
+
+ Expression filter = Expressions.or(Expressions.equal("c1", 0),
Expressions.equal("c1", 1));
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .filter(filter)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
+ .execute();
+
+ List<DeleteFile> newDeleteFiles = except(deleteFiles(table), deleteFiles);
+ Assert.assertEquals("Should have 2 new delete files", 0,
newDeleteFiles.size());
+
+ List<DeleteFile> expectedRewrittenFiles =
+ filterFiles(table, deleteFiles, ImmutableList.of(0),
ImmutableList.of(1));
+ checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
+
+ List<Object[]> actualRecords = records(table);
+ List<Object[]> allDeletes = deleteRecords(table);
+ // Only non-compacted deletes remain
+ List<Object[]> expectedDeletesFiltered =
+ filterDeletes(expectedDeletes, ImmutableList.of(2),
ImmutableList.of(3));
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertEquals("Position deletes must match", expectedDeletesFiltered,
allDeletes);
+ }
+
@Test
public void testPartitionEvolutionAdd() throws Exception {
Table table = createTableUnpartitioned(2, SCALE);
@@ -423,7 +518,6 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
.rewritePositionDeletes(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
-
List<DeleteFile> newDeleteFiles = deleteFiles(table);
Assert.assertEquals("Should have 3 new delete files", 3,
newDeleteFiles.size());
assertNotContains(expectedRewritten, newDeleteFiles);
@@ -737,6 +831,71 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
}
+ private List<Object[]> filterDeletes(List<Object[]> deletes, List<?>...
partitionValues) {
+ Stream<Object[]> matches =
+ deletes.stream()
+ .filter(
+ r -> {
+ Object[] partition = (Object[]) r[3];
+ return Arrays.stream(partitionValues)
+ .map(partitionValue -> match(partition, partitionValue))
+ .reduce((a, b) -> a || b)
+ .get();
+ });
+ return sorted(matches).collect(Collectors.toList());
+ }
+
+ private boolean match(Object[] partition, List<?> expectedPartition) {
+ return IntStream.range(0, expectedPartition.size())
+ .mapToObj(j -> partition[j] == expectedPartition.get(j))
+ .reduce((a, b) -> a && b)
+ .get();
+ }
+
+ private Stream<Object[]> sorted(Stream<Object[]> deletes) {
+ return deletes.sorted(
+ (a, b) -> {
+ String aFilePath = (String) a[0];
+ String bFilePath = (String) b[0];
+ int filePathCompare = aFilePath.compareTo(bFilePath);
+ if (filePathCompare != 0) {
+ return filePathCompare;
+ } else {
+ long aPos = (long) a[1];
+ long bPos = (long) b[1];
+ return Long.compare(aPos, bPos);
+ }
+ });
+ }
+
+ private List<DeleteFile> filterFiles(
+ Table table, List<DeleteFile> files, List<?>... partitionValues) {
+ List<Types.StructType> partitionTypes =
+ table.specs().values().stream()
+ .map(PartitionSpec::partitionType)
+ .collect(Collectors.toList());
+ List<PartitionData> partitionDatas =
+ Arrays.stream(partitionValues)
+ .map(
+ partitionValue -> {
+ Types.StructType thisType =
+ partitionTypes.stream()
+ .filter(f -> f.fields().size() ==
partitionValue.size())
+ .findFirst()
+ .get();
+ PartitionData partition = new PartitionData(thisType);
+ for (int i = 0; i < partitionValue.size(); i++) {
+ partition.set(i, partitionValue.get(i));
+ }
+ return partition;
+ })
+ .collect(Collectors.toList());
+
+ return files.stream()
+ .filter(f -> partitionDatas.stream().anyMatch(data ->
f.partition().equals(data)))
+ .collect(Collectors.toList());
+ }
+
private void checkResult(
Result result,
List<DeleteFile> rewrittenDeletes,