This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5733aecd0d Core: Pushdown data_file.content filter in entries metadata
table (#10203)
5733aecd0d is described below
commit 5733aecd0d010b17b9bd0f10aaccb601b1f13c90
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Jun 24 10:39:26 2024 -0700
Core: Pushdown data_file.content filter in entries metadata table (#10203)
---
.../java/org/apache/iceberg/BaseEntriesTable.java | 185 ++++++++++++++++++++-
.../apache/iceberg/MetadataTableScanTestBase.java | 5 +-
.../org/apache/iceberg/TestMetadataTableScans.java | 148 +++++++++++++++--
3 files changed, 322 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
index 43d8a71f87..4e485d516f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
@@ -23,8 +23,12 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Binder;
+import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
@@ -68,6 +72,7 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
Expression rowFilter = context.rowFilter();
boolean caseSensitive = context.caseSensitive();
boolean ignoreResiduals = context.ignoreResiduals();
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
@@ -77,14 +82,18 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
PartitionSpec transformedSpec =
BaseFilesTable.transformSpec(tableSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter,
transformedSpec, caseSensitive);
});
+ ManifestContentEvaluator manifestContentEvaluator =
+ new ManifestContentEvaluator(filter, tableSchema.asStruct(),
caseSensitive);
CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
- manifests, manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+ manifests,
+ manifest ->
+ evalCache.get(manifest.partitionSpecId()).eval(manifest)
+ && manifestContentEvaluator.eval(manifest));
String schemaString = SchemaParser.toJson(projectedSchema);
String specString =
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
return CloseableIterable.transform(
@@ -94,6 +103,178 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
table, manifest, projectedSchema, schemaString, specString,
residuals));
}
+ /**
+ * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether
a given data or
+ * delete manifests shall be included in the scan
+ */
+ private static class ManifestContentEvaluator {
+
+ private final Expression boundExpr;
+
+ private ManifestContentEvaluator(
+ Expression expr, Types.StructType structType, boolean caseSensitive) {
+ Expression rewritten = Expressions.rewriteNot(expr);
+ this.boundExpr = Binder.bind(structType, rewritten, caseSensitive);
+ }
+
+ private boolean eval(ManifestFile manifest) {
+ return new ManifestEvalVisitor().eval(manifest);
+ }
+
+ private class ManifestEvalVisitor extends
ExpressionVisitors.BoundExpressionVisitor<Boolean> {
+
+ private int manifestContentId;
+
+ private static final boolean ROWS_MIGHT_MATCH = true;
+ private static final boolean ROWS_CANNOT_MATCH = false;
+
+ private boolean eval(ManifestFile manifestFile) {
+ this.manifestContentId = manifestFile.content().id();
+ return ExpressionVisitors.visitEvaluator(boundExpr, this);
+ }
+
+ @Override
+ public Boolean alwaysTrue() {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean alwaysFalse() {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ @Override
+ public Boolean not(Boolean result) {
+ return !result;
+ }
+
+ @Override
+ public Boolean and(Boolean leftResult, Boolean rightResult) {
+ return leftResult && rightResult;
+ }
+
+ @Override
+ public Boolean or(Boolean leftResult, Boolean rightResult) {
+ return leftResult || rightResult;
+ }
+
+ @Override
+ public <T> Boolean isNull(BoundReference<T> ref) {
+ if (fileContent(ref)) {
+ return ROWS_CANNOT_MATCH; // date_file.content should not be null
+ } else {
+ return ROWS_MIGHT_MATCH;
+ }
+ }
+
+ @Override
+ public <T> Boolean notNull(BoundReference<T> ref) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean isNaN(BoundReference<T> ref) {
+ if (fileContent(ref)) {
+ return ROWS_CANNOT_MATCH; // date_file.content should not be nan
+ } else {
+ return ROWS_MIGHT_MATCH;
+ }
+ }
+
+ @Override
+ public <T> Boolean notNaN(BoundReference<T> ref) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+ if (fileContent(ref)) {
+ Literal<Integer> intLit = lit.to(Types.IntegerType.get());
+ if (!contentMatch(intLit.value())) {
+ return ROWS_CANNOT_MATCH;
+ }
+ }
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+ if (fileContent(ref)) {
+ Literal<Integer> intLit = lit.to(Types.IntegerType.get());
+ if (contentMatch(intLit.value())) {
+ return ROWS_CANNOT_MATCH;
+ }
+ }
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+ if (fileContent(ref)) {
+ if (literalSet.stream().noneMatch(lit -> contentMatch((Integer)
lit))) {
+ return ROWS_CANNOT_MATCH;
+ }
+ }
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+ if (fileContent(ref)) {
+ if (literalSet.stream().anyMatch(lit -> contentMatch((Integer)
lit))) {
+ return ROWS_CANNOT_MATCH;
+ }
+ }
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ private <T> boolean fileContent(BoundReference<T> ref) {
+ return ref.fieldId() == DataFile.CONTENT.fieldId();
+ }
+
+ private <T> boolean contentMatch(Integer fileContentId) {
+ if (FileContent.DATA.id() == fileContentId) {
+ return ManifestContent.DATA.id() == manifestContentId;
+ } else if (FileContent.EQUALITY_DELETES.id() == fileContentId
+ || FileContent.POSITION_DELETES.id() == fileContentId) {
+ return ManifestContent.DELETES.id() == manifestContentId;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema projection;
private final Schema fileProjection;
diff --git
a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index 9c732e843c..a4e964b017 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -43,9 +43,8 @@ public abstract class MetadataTableScanTestBase extends
TestBase {
return Arrays.asList(1, 2);
}
- protected Set<String> actualManifestListPaths(TableScan
allManifestsTableScan) {
- return
StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
- .map(t -> (AllManifestsTable.ManifestListReadTask) t)
+ protected Set<String> scannedPaths(TableScan scan) {
+ return StreamSupport.stream(scan.planFiles().spliterator(), false)
.map(t -> t.file().path().toString())
.collect(Collectors.toSet());
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index df314f6a80..0a3040939c 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
@@ -227,6 +228,131 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
}
}
+ @TestTemplate
+ public void testEntriesTableDataFileContentEq() {
+ preparePartitionedTable();
+
+ Table entriesTable = new ManifestEntriesTable(table);
+
+ Expression dataOnly = Expressions.equal("data_file.content", 0);
+ TableScan entriesTableScan = entriesTable.newScan().filter(dataOnly);
+ Set<String> expected =
+ table.currentSnapshot().dataManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+
+ assertThat(scannedPaths(entriesTableScan))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expected);
+
+ assertThat(
+
scannedPaths(entriesTable.newScan().filter(Expressions.equal("data_file.content",
3))))
+ .as("Expected manifest filter by data file content does not match")
+ .isEmpty();
+ }
+
+ @TestTemplate
+ public void testEntriesTableDateFileContentNotEq() {
+ preparePartitionedTable();
+
+ Table entriesTable = new ManifestEntriesTable(table);
+
+ Expression notData = Expressions.notEqual("data_file.content", 0);
+ TableScan entriesTableScan = entriesTable.newScan().filter(notData);
+ Set<String> expected =
+ table.currentSnapshot().deleteManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+
+ assertThat(scannedPaths(entriesTableScan))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expected);
+
+ Set<String> allManifests =
+ table.currentSnapshot().allManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ assertThat(
+ scannedPaths(
+
entriesTable.newScan().filter(Expressions.notEqual("data_file.content", 3))))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(allManifests);
+ }
+
+ @TestTemplate
+ public void testEntriesTableDataFileContentIn() {
+ preparePartitionedTable();
+ Table entriesTable = new ManifestEntriesTable(table);
+
+ Expression in0 = Expressions.in("data_file.content", 0);
+ TableScan scan1 = entriesTable.newScan().filter(in0);
+ Set<String> expectedDataManifestPath =
+ table.currentSnapshot().dataManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ assertThat(scannedPaths(scan1))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expectedDataManifestPath);
+
+ Expression in12 = Expressions.in("data_file.content", 1, 2);
+ TableScan scan2 = entriesTable.newScan().filter(in12);
+ Set<String> expectedDeleteManifestPath =
+ table.currentSnapshot().deleteManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ assertThat(scannedPaths(scan2))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expectedDeleteManifestPath);
+
+ Expression inAll = Expressions.in("data_file.content", 0, 1, 2);
+ Set<String> allManifests = Sets.union(expectedDataManifestPath,
expectedDeleteManifestPath);
+ assertThat(scannedPaths(entriesTable.newScan().filter(inAll)))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(allManifests);
+
+ Expression inNeither = Expressions.in("data_file.content", 3, 4);
+ assertThat(scannedPaths(entriesTable.newScan().filter(inNeither)))
+ .as("Expected manifest filter by data file content does not match")
+ .isEmpty();
+ }
+
+ @TestTemplate
+ public void testEntriesTableDataFileContentNotIn() {
+ preparePartitionedTable();
+ Table entriesTable = new ManifestEntriesTable(table);
+
+ Expression notIn0 = Expressions.notIn("data_file.content", 0);
+ TableScan scan1 = entriesTable.newScan().filter(notIn0);
+ Set<String> expectedDeleteManifestPath =
+ table.currentSnapshot().deleteManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ assertThat(scannedPaths(scan1))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expectedDeleteManifestPath);
+
+ Expression notIn12 = Expressions.notIn("data_file.content", 1, 2);
+ TableScan scan2 = entriesTable.newScan().filter(notIn12);
+ Set<String> expectedDataManifestPath =
+ table.currentSnapshot().dataManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ assertThat(scannedPaths(scan2))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(expectedDataManifestPath);
+
+ Expression notInNeither = Expressions.notIn("data_file.content", 3);
+ Set<String> allManifests = Sets.union(expectedDataManifestPath,
expectedDeleteManifestPath);
+ assertThat(scannedPaths(entriesTable.newScan().filter(notInNeither)))
+ .as("Expected manifest filter by data file content does not match")
+ .isEqualTo(allManifests);
+
+ Expression notInAll = Expressions.notIn("data_file.content", 0, 1, 2);
+ assertThat(scannedPaths(entriesTable.newScan().filter(notInAll)))
+ .as("Expected manifest filter by data file content does not match")
+ .isEmpty();
+ }
+
@TestTemplate
public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
@@ -1081,7 +1207,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.greaterThan("reference_snapshot_id",
2));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L));
}
@@ -1095,7 +1221,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.greaterThanOrEqual("reference_snapshot_id",
3));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L));
}
@@ -1109,7 +1235,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.lessThan("reference_snapshot_id",
3));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L));
}
@@ -1123,7 +1249,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.lessThanOrEqual("reference_snapshot_id",
2));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L));
}
@@ -1137,7 +1263,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.equal("reference_snapshot_id", 2));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 2L));
}
@@ -1151,7 +1277,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.notEqual("reference_snapshot_id",
2));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L));
}
@@ -1165,7 +1291,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.in("reference_snapshot_id", 1, 3));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L));
}
@@ -1179,7 +1305,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan manifestsTableScan =
manifestsTable.newScan().filter(Expressions.notIn("reference_snapshot_id", 1,
3));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L));
}
@@ -1198,7 +1324,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.and(
Expressions.equal("reference_snapshot_id", 2),
Expressions.greaterThan("length", 0)));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 2L));
}
@@ -1217,7 +1343,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.or(
Expressions.equal("reference_snapshot_id", 2),
Expressions.equal("reference_snapshot_id", 4)));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L));
}
@@ -1233,7 +1359,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
.newScan()
.filter(Expressions.not(Expressions.equal("reference_snapshot_id",
2)));
- assertThat(actualManifestListPaths(manifestsTableScan))
+ assertThat(scannedPaths(manifestsTableScan))
.as("Expected snapshots do not match")
.isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L));
}