This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 5733aecd0d Core: Pushdown data_file.content filter in entries metadata 
table (#10203)
5733aecd0d is described below

commit 5733aecd0d010b17b9bd0f10aaccb601b1f13c90
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Jun 24 10:39:26 2024 -0700

    Core: Pushdown data_file.content filter in entries metadata table (#10203)
---
 .../java/org/apache/iceberg/BaseEntriesTable.java  | 185 ++++++++++++++++++++-
 .../apache/iceberg/MetadataTableScanTestBase.java  |   5 +-
 .../org/apache/iceberg/TestMetadataTableScans.java | 148 +++++++++++++++--
 3 files changed, 322 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java 
b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
index 43d8a71f87..4e485d516f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
@@ -23,8 +23,12 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Binder;
+import org.apache.iceberg.expressions.BoundReference;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
@@ -68,6 +72,7 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
     Expression rowFilter = context.rowFilter();
     boolean caseSensitive = context.caseSensitive();
     boolean ignoreResiduals = context.ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
 
     LoadingCache<Integer, ManifestEvaluator> evalCache =
         Caffeine.newBuilder()
@@ -77,14 +82,18 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
                   PartitionSpec transformedSpec = 
BaseFilesTable.transformSpec(tableSchema, spec);
                   return ManifestEvaluator.forRowFilter(rowFilter, 
transformedSpec, caseSensitive);
                 });
+    ManifestContentEvaluator manifestContentEvaluator =
+        new ManifestContentEvaluator(filter, tableSchema.asStruct(), 
caseSensitive);
 
     CloseableIterable<ManifestFile> filteredManifests =
         CloseableIterable.filter(
-            manifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+            manifests,
+            manifest ->
+                evalCache.get(manifest.partitionSpecId()).eval(manifest)
+                    && manifestContentEvaluator.eval(manifest));
 
     String schemaString = SchemaParser.toJson(projectedSchema);
     String specString = 
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
     ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
     return CloseableIterable.transform(
@@ -94,6 +103,178 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
                 table, manifest, projectedSchema, schemaString, specString, 
residuals));
   }
 
+  /**
+   * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether 
a given data or
+   * delete manifests shall be included in the scan
+   */
+  private static class ManifestContentEvaluator {
+
+    private final Expression boundExpr;
+
+    private ManifestContentEvaluator(
+        Expression expr, Types.StructType structType, boolean caseSensitive) {
+      Expression rewritten = Expressions.rewriteNot(expr);
+      this.boundExpr = Binder.bind(structType, rewritten, caseSensitive);
+    }
+
+    private boolean eval(ManifestFile manifest) {
+      return new ManifestEvalVisitor().eval(manifest);
+    }
+
+    private class ManifestEvalVisitor extends 
ExpressionVisitors.BoundExpressionVisitor<Boolean> {
+
+      private int manifestContentId;
+
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(ManifestFile manifestFile) {
+        this.manifestContentId = manifestFile.content().id();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        if (fileContent(ref)) {
+          return ROWS_CANNOT_MATCH; // date_file.content should not be null
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (fileContent(ref)) {
+          return ROWS_CANNOT_MATCH; // date_file.content should not be nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        if (fileContent(ref)) {
+          Literal<Integer> intLit = lit.to(Types.IntegerType.get());
+          if (!contentMatch(intLit.value())) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        if (fileContent(ref)) {
+          Literal<Integer> intLit = lit.to(Types.IntegerType.get());
+          if (contentMatch(intLit.value())) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (fileContent(ref)) {
+          if (literalSet.stream().noneMatch(lit -> contentMatch((Integer) 
lit))) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (fileContent(ref)) {
+          if (literalSet.stream().anyMatch(lit -> contentMatch((Integer) 
lit))) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      private <T> boolean fileContent(BoundReference<T> ref) {
+        return ref.fieldId() == DataFile.CONTENT.fieldId();
+      }
+
+      private <T> boolean contentMatch(Integer fileContentId) {
+        if (FileContent.DATA.id() == fileContentId) {
+          return ManifestContent.DATA.id() == manifestContentId;
+        } else if (FileContent.EQUALITY_DELETES.id() == fileContentId
+            || FileContent.POSITION_DELETES.id() == fileContentId) {
+          return ManifestContent.DELETES.id() == manifestContentId;
+        } else {
+          return false;
+        }
+      }
+    }
+  }
+
   static class ManifestReadTask extends BaseFileScanTask implements DataTask {
     private final Schema projection;
     private final Schema fileProjection;
diff --git 
a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java 
b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index 9c732e843c..a4e964b017 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -43,9 +43,8 @@ public abstract class MetadataTableScanTestBase extends 
TestBase {
     return Arrays.asList(1, 2);
   }
 
-  protected Set<String> actualManifestListPaths(TableScan 
allManifestsTableScan) {
-    return 
StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
-        .map(t -> (AllManifestsTable.ManifestListReadTask) t)
+  protected Set<String> scannedPaths(TableScan scan) {
+    return StreamSupport.stream(scan.planFiles().spliterator(), false)
         .map(t -> t.file().path().toString())
         .collect(Collectors.toSet());
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index df314f6a80..0a3040939c 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
@@ -227,6 +228,131 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     }
   }
 
+  @TestTemplate
+  public void testEntriesTableDataFileContentEq() {
+    preparePartitionedTable();
+
+    Table entriesTable = new ManifestEntriesTable(table);
+
+    Expression dataOnly = Expressions.equal("data_file.content", 0);
+    TableScan entriesTableScan = entriesTable.newScan().filter(dataOnly);
+    Set<String> expected =
+        table.currentSnapshot().dataManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+
+    assertThat(scannedPaths(entriesTableScan))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expected);
+
+    assertThat(
+            
scannedPaths(entriesTable.newScan().filter(Expressions.equal("data_file.content",
 3))))
+        .as("Expected manifest filter by data file content does not match")
+        .isEmpty();
+  }
+
+  @TestTemplate
+  public void testEntriesTableDateFileContentNotEq() {
+    preparePartitionedTable();
+
+    Table entriesTable = new ManifestEntriesTable(table);
+
+    Expression notData = Expressions.notEqual("data_file.content", 0);
+    TableScan entriesTableScan = entriesTable.newScan().filter(notData);
+    Set<String> expected =
+        table.currentSnapshot().deleteManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+
+    assertThat(scannedPaths(entriesTableScan))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expected);
+
+    Set<String> allManifests =
+        table.currentSnapshot().allManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+    assertThat(
+            scannedPaths(
+                
entriesTable.newScan().filter(Expressions.notEqual("data_file.content", 3))))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(allManifests);
+  }
+
+  @TestTemplate
+  public void testEntriesTableDataFileContentIn() {
+    preparePartitionedTable();
+    Table entriesTable = new ManifestEntriesTable(table);
+
+    Expression in0 = Expressions.in("data_file.content", 0);
+    TableScan scan1 = entriesTable.newScan().filter(in0);
+    Set<String> expectedDataManifestPath =
+        table.currentSnapshot().dataManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+    assertThat(scannedPaths(scan1))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expectedDataManifestPath);
+
+    Expression in12 = Expressions.in("data_file.content", 1, 2);
+    TableScan scan2 = entriesTable.newScan().filter(in12);
+    Set<String> expectedDeleteManifestPath =
+        table.currentSnapshot().deleteManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+    assertThat(scannedPaths(scan2))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expectedDeleteManifestPath);
+
+    Expression inAll = Expressions.in("data_file.content", 0, 1, 2);
+    Set<String> allManifests = Sets.union(expectedDataManifestPath, 
expectedDeleteManifestPath);
+    assertThat(scannedPaths(entriesTable.newScan().filter(inAll)))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(allManifests);
+
+    Expression inNeither = Expressions.in("data_file.content", 3, 4);
+    assertThat(scannedPaths(entriesTable.newScan().filter(inNeither)))
+        .as("Expected manifest filter by data file content does not match")
+        .isEmpty();
+  }
+
+  @TestTemplate
+  public void testEntriesTableDataFileContentNotIn() {
+    preparePartitionedTable();
+    Table entriesTable = new ManifestEntriesTable(table);
+
+    Expression notIn0 = Expressions.notIn("data_file.content", 0);
+    TableScan scan1 = entriesTable.newScan().filter(notIn0);
+    Set<String> expectedDeleteManifestPath =
+        table.currentSnapshot().deleteManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+    assertThat(scannedPaths(scan1))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expectedDeleteManifestPath);
+
+    Expression notIn12 = Expressions.notIn("data_file.content", 1, 2);
+    TableScan scan2 = entriesTable.newScan().filter(notIn12);
+    Set<String> expectedDataManifestPath =
+        table.currentSnapshot().dataManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toSet());
+    assertThat(scannedPaths(scan2))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(expectedDataManifestPath);
+
+    Expression notInNeither = Expressions.notIn("data_file.content", 3);
+    Set<String> allManifests = Sets.union(expectedDataManifestPath, 
expectedDeleteManifestPath);
+    assertThat(scannedPaths(entriesTable.newScan().filter(notInNeither)))
+        .as("Expected manifest filter by data file content does not match")
+        .isEqualTo(allManifests);
+
+    Expression notInAll = Expressions.notIn("data_file.content", 0, 1, 2);
+    assertThat(scannedPaths(entriesTable.newScan().filter(notInAll)))
+        .as("Expected manifest filter by data file content does not match")
+        .isEmpty();
+  }
+
   @TestTemplate
   public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
     table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
@@ -1081,7 +1207,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.greaterThan("reference_snapshot_id",
 2));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L));
   }
@@ -1095,7 +1221,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.greaterThanOrEqual("reference_snapshot_id",
 3));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L));
   }
@@ -1109,7 +1235,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.lessThan("reference_snapshot_id", 
3));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L));
   }
@@ -1123,7 +1249,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.lessThanOrEqual("reference_snapshot_id",
 2));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L));
   }
@@ -1137,7 +1263,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.equal("reference_snapshot_id", 2));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L));
   }
@@ -1151,7 +1277,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.notEqual("reference_snapshot_id", 
2));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L));
   }
@@ -1165,7 +1291,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
 
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.in("reference_snapshot_id", 1, 3));
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L));
   }
@@ -1179,7 +1305,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan manifestsTableScan =
         
manifestsTable.newScan().filter(Expressions.notIn("reference_snapshot_id", 1, 
3));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L));
   }
@@ -1198,7 +1324,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
                 Expressions.and(
                     Expressions.equal("reference_snapshot_id", 2),
                     Expressions.greaterThan("length", 0)));
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L));
   }
@@ -1217,7 +1343,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
                 Expressions.or(
                     Expressions.equal("reference_snapshot_id", 2),
                     Expressions.equal("reference_snapshot_id", 4)));
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L));
   }
@@ -1233,7 +1359,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             .newScan()
             .filter(Expressions.not(Expressions.equal("reference_snapshot_id", 
2)));
 
-    assertThat(actualManifestListPaths(manifestsTableScan))
+    assertThat(scannedPaths(manifestsTableScan))
         .as("Expected snapshots do not match")
         .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L));
   }

Reply via email to