rdblue commented on a change in pull request #2926:
URL: https://github.com/apache/iceberg/pull/2926#discussion_r682759938



##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table 
table, String name) {
     this.name = name;
   }
 
+

Review comment:
       Nit: unnecessary newline.

##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table 
table, String name) {
     this.name = name;
   }
 
+
+  /**
+   * This method transforms the table's partition spec to a spec that is used 
to rewrite the user-provided filter
+   * expression against the partitions table.
+   * <p>
+   * The resulting partition spec maps partition.X fields to partition X using 
an identity partition transform. When
+   * this spec is used to project an expression for the partitions table, the 
projection will remove predicates for
+   * non-partition fields (not in the spec) and will remove the "partition." 
prefix from fields.
+   *
+   * @param partitionTableSchema schema of the partition table

Review comment:
       Looks like this is still assuming the partition table. May want to 
update it to `tableSchema`.
   
   While we're thinking about this, it may also make sense to allow passing a 
different prefix. The prefix for the entries table, for example, would be 
`data_file.partition.`

##########
File path: core/src/main/java/org/apache/iceberg/DataFilesTable.java
##########
@@ -109,11 +112,20 @@ public long targetSplitSize() {
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : 
rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
+      // use an inclusive projection to remove the partition name prefix and 
filter out any non-partition expressions
+      Expression partitionFilter = Projections
+          .inclusive(transformSpec(fileSchema, table().spec()), caseSensitive)
+          .project(rowFilter);
+
+      ManifestEvaluator manifestEval = 
ManifestEvaluator.forPartitionFilter(partitionFilter, table().spec(),
+          caseSensitive);

Review comment:
       Nit: I'd probably wrap so that all the arguments are on the same line. 
That seems more readable to me.

##########
File path: core/src/main/java/org/apache/iceberg/DataFilesTable.java
##########
@@ -142,5 +154,10 @@ public long targetSplitSize() {
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
+
+    @VisibleForTesting
+    ManifestFile getManifest() {

Review comment:
       I realize this is only for testing, but I would still suggest using a 
method name that we don't have to change later if we want to use the method 
elsewhere. That means removing `get` because `get` doesn't add anything. 
Typically, `get` signals that the method name can be simpler (e.g., `manifest` 
to return the manifest) or should have a more descriptive verb like `find`, 
`fetch`, `create`, etc. that tell the caller more about what is happening. The 
only time we use `get` is when the object needs to be a Java bean.

##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
     validateIncludesPartitionScan(tasksUnary, 3);
   }
 
+  @Test
+  public void testFilesTableScanNoFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition 
spec")).asStruct();
+
+    TableScan scanNoFilter = 
dataFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());

Review comment:
       I'm not sure you need to add the schema check since this is trying to 
validate pushdown, but I'm fine keeping it.

##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
     validateIncludesPartitionScan(tasksUnary, 3);
   }
 
+  @Test
+  public void testFilesTableScanNoFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition 
spec")).asStruct();
+
+    TableScan scanNoFilter = 
dataFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<CombinedScanTask> tasksNoFilter = 
scanNoFilter.planTasks();

Review comment:
       I think this would be a bit simpler if you used `planFiles` instead of 
`planTasks`. That should produce only `FileScanTask` rather than 
`CombinedScanTask` so you wouldn't need to `flatMap` tasks because there is 
only one data file per task.

##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
     validateIncludesPartitionScan(tasksUnary, 3);
   }
 
+  @Test
+  public void testFilesTableScanNoFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition 
spec")).asStruct();
+
+    TableScan scanNoFilter = 
dataFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<CombinedScanTask> tasksNoFilter = 
scanNoFilter.planTasks();
+    List<ManifestFile> manifests = 
StreamSupport.stream(tasksNoFilter.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(4, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+    validateIncludesManifestFile(manifests, 1);
+    validateIncludesManifestFile(manifests, 2);
+    validateIncludesManifestFile(manifests, 3);
+  }
+
+  @Test
+  public void testFilesTableScanAndFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+    CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+    List<ManifestFile> manifests = 
StreamSupport.stream(tasksAndEq.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+  }
+
+  @Test
+  public void testFilesTableScanLtFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression ltAnd = Expressions.and(
+        Expressions.lessThan("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));

Review comment:
       Is the `record_count` filter needed or is it just there for good measure?

##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
     validateIncludesPartitionScan(tasksUnary, 3);
   }
 
+  @Test
+  public void testFilesTableScanNoFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition 
spec")).asStruct();
+
+    TableScan scanNoFilter = 
dataFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<CombinedScanTask> tasksNoFilter = 
scanNoFilter.planTasks();
+    List<ManifestFile> manifests = 
StreamSupport.stream(tasksNoFilter.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(4, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+    validateIncludesManifestFile(manifests, 1);
+    validateIncludesManifestFile(manifests, 2);
+    validateIncludesManifestFile(manifests, 3);
+  }
+
+  @Test
+  public void testFilesTableScanAndFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+    CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+    List<ManifestFile> manifests = 
StreamSupport.stream(tasksAndEq.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+  }
+
+  @Test
+  public void testFilesTableScanLtFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression ltAnd = Expressions.and(
+        Expressions.lessThan("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = dataFilesTable.newScan()
+        .filter(ltAnd);
+    CloseableIterable<CombinedScanTask> tasksLt = scan.planTasks();
+    List<ManifestFile> manifests = StreamSupport.stream(tasksLt.spliterator(), 
false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(2, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+    validateIncludesManifestFile(manifests, 1);
+  }
+
+  @Test
+  public void testFilesTableScanOrFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = dataFilesTable.newScan()
+        .filter(or);
+    CloseableIterable<CombinedScanTask> tasksOr = scan.planTasks();
+    List<ManifestFile> manifests = StreamSupport.stream(tasksOr.spliterator(), 
false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(4, manifests.size());
+    validateIncludesManifestFile(manifests, 0);
+    validateIncludesManifestFile(manifests, 1);
+    validateIncludesManifestFile(manifests, 2);
+    validateIncludesManifestFile(manifests, 3);
+  }
+
+  @Test
+  public void testFilesScanNotFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression not = 
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scan = dataFilesTable.newScan()
+        .filter(not);
+    CloseableIterable<CombinedScanTask> tasksNot = scan.planTasks();
+    List<ManifestFile> manifests = 
StreamSupport.stream(tasksNot.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> 
((DataFilesTable.ManifestReadTask) t).getManifest()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(2, manifests.size());
+    validateIncludesManifestFile(manifests, 2);
+    validateIncludesManifestFile(manifests, 3);
+  }
+
+
+  @Test
+  public void testFilesTableScanInFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();

Review comment:
       You might consider a helper method to prepare the table for these tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to