rdblue commented on a change in pull request #2926:
URL: https://github.com/apache/iceberg/pull/2926#discussion_r682759938
##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table
table, String name) {
this.name = name;
}
+
Review comment:
Nit: unnecessary newline.
##########
File path: core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
##########
@@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table
table, String name) {
this.name = name;
}
+
+ /**
+ * This method transforms the table's partition spec to a spec that is used
to rewrite the user-provided filter
+ * expression against the partitions table.
+ * <p>
+ * The resulting partition spec maps partition.X fields to partition X using
an identity partition transform. When
+ * this spec is used to project an expression for the partitions table, the
projection will remove predicates for
+ * non-partition fields (not in the spec) and will remove the "partition."
prefix from fields.
+ *
+ * @param partitionTableSchema schema of the partition table
Review comment:
Looks like this is still assuming the partition table. May want to
update it to `tableSchema`.
While we're thinking about this, it may also make sense to allow passing a
different prefix. The prefix for the entries table, for example, would be
`data_file.partition.`
##########
File path: core/src/main/java/org/apache/iceberg/DataFilesTable.java
##########
@@ -109,11 +112,20 @@ public long targetSplitSize() {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() :
rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+ // use an inclusive projection to remove the partition name prefix and
filter out any non-partition expressions
+ Expression partitionFilter = Projections
+ .inclusive(transformSpec(fileSchema, table().spec()), caseSensitive)
+ .project(rowFilter);
+
+ ManifestEvaluator manifestEval =
ManifestEvaluator.forPartitionFilter(partitionFilter, table().spec(),
+ caseSensitive);
Review comment:
Nit: I'd probably wrap so that all the arguments are on the same line.
That seems more readable to me.
##########
File path: core/src/main/java/org/apache/iceberg/DataFilesTable.java
##########
@@ -142,5 +154,10 @@ public long targetSplitSize() {
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}
+
+ @VisibleForTesting
+ ManifestFile getManifest() {
Review comment:
I realize this is only for testing, but I would still suggest using a
method name that we don't have to change later if we want to use the method
elsewhere. That means removing `get` because `get` doesn't add anything.
Typically, `get` signals that the method name can be simpler (e.g., `manifest`
to return the manifest) or should have a more descriptive verb like `find`,
`fetch`, `create`, etc. that tell the caller more about what is happening. The
only time we use `get` is when the object needs to be a Java bean.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
validateIncludesPartitionScan(tasksUnary, 3);
}
+ @Test
+ public void testFilesTableScanNoFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition
spec")).asStruct();
+
+ TableScan scanNoFilter =
dataFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
Review comment:
I'm not sure you need to add the schema check since this is trying to
validate pushdown, but I'm fine keeping it.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
validateIncludesPartitionScan(tasksUnary, 3);
}
+ @Test
+ public void testFilesTableScanNoFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition
spec")).asStruct();
+
+ TableScan scanNoFilter =
dataFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<CombinedScanTask> tasksNoFilter =
scanNoFilter.planTasks();
Review comment:
I think this would be a bit simpler if you used `planFiles` instead of
`planTasks`. That should produce only `FileScanTask` rather than
`CombinedScanTask` so you wouldn't need to `flatMap` tasks because there is
only one data file per task.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
validateIncludesPartitionScan(tasksUnary, 3);
}
+ @Test
+ public void testFilesTableScanNoFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition
spec")).asStruct();
+
+ TableScan scanNoFilter =
dataFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<CombinedScanTask> tasksNoFilter =
scanNoFilter.planTasks();
+ List<ManifestFile> manifests =
StreamSupport.stream(tasksNoFilter.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(4, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ validateIncludesManifestFile(manifests, 1);
+ validateIncludesManifestFile(manifests, 2);
+ validateIncludesManifestFile(manifests, 3);
+ }
+
+ @Test
+ public void testFilesTableScanAndFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+ CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+ List<ManifestFile> manifests =
StreamSupport.stream(tasksAndEq.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ }
+
+ @Test
+ public void testFilesTableScanLtFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression ltAnd = Expressions.and(
+ Expressions.lessThan("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
Review comment:
Is the `record_count` filter needed or is it just there for good measure?
##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -350,6 +354,224 @@ public void testPartitionsTableScanNotNullFilter() {
validateIncludesPartitionScan(tasksUnary, 3);
}
+ @Test
+ public void testFilesTableScanNoFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition
spec")).asStruct();
+
+ TableScan scanNoFilter =
dataFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<CombinedScanTask> tasksNoFilter =
scanNoFilter.planTasks();
+ List<ManifestFile> manifests =
StreamSupport.stream(tasksNoFilter.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(4, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ validateIncludesManifestFile(manifests, 1);
+ validateIncludesManifestFile(manifests, 2);
+ validateIncludesManifestFile(manifests, 3);
+ }
+
+ @Test
+ public void testFilesTableScanAndFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+ CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+ List<ManifestFile> manifests =
StreamSupport.stream(tasksAndEq.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ }
+
+ @Test
+ public void testFilesTableScanLtFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression ltAnd = Expressions.and(
+ Expressions.lessThan("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = dataFilesTable.newScan()
+ .filter(ltAnd);
+ CloseableIterable<CombinedScanTask> tasksLt = scan.planTasks();
+ List<ManifestFile> manifests = StreamSupport.stream(tasksLt.spliterator(),
false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(2, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ validateIncludesManifestFile(manifests, 1);
+ }
+
+ @Test
+ public void testFilesTableScanOrFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression or = Expressions.or(
+ Expressions.equal("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = dataFilesTable.newScan()
+ .filter(or);
+ CloseableIterable<CombinedScanTask> tasksOr = scan.planTasks();
+ List<ManifestFile> manifests = StreamSupport.stream(tasksOr.spliterator(),
false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(4, manifests.size());
+ validateIncludesManifestFile(manifests, 0);
+ validateIncludesManifestFile(manifests, 1);
+ validateIncludesManifestFile(manifests, 2);
+ validateIncludesManifestFile(manifests, 3);
+ }
+
+ @Test
+ public void testFilesScanNotFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression not =
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+ TableScan scan = dataFilesTable.newScan()
+ .filter(not);
+ CloseableIterable<CombinedScanTask> tasksNot = scan.planTasks();
+ List<ManifestFile> manifests =
StreamSupport.stream(tasksNot.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t ->
((DataFilesTable.ManifestReadTask) t).getManifest()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(2, manifests.size());
+ validateIncludesManifestFile(manifests, 2);
+ validateIncludesManifestFile(manifests, 3);
+ }
+
+
+ @Test
+ public void testFilesTableScanInFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
Review comment:
You might consider a helper method to prepare the table for these tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]