This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8d5c498 Core: Add predicate push down for partitions metadata table
(#2358)
8d5c498 is described below
commit 8d5c4986e9a93bd77d8d3ef851ea9aed825e85b2
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Jun 22 13:16:45 2021 -0700
Core: Add predicate push down for partitions metadata table (#2358)
Co-authored-by: Szehon Ho <[email protected]>
---
.../java/org/apache/iceberg/PartitionsTable.java | 69 ++++++--
.../java/org/apache/iceberg/TableTestBase.java | 24 +++
.../org/apache/iceberg/TestMetadataTableScans.java | 197 +++++++++++++++++++++
.../spark/source/TestIcebergSourceTablesBase.java | 19 ++
4 files changed, 299 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index c083bb5..e190ca4 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -20,9 +20,14 @@
package org.apache.iceberg;
import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.ThreadPools;
/**
* A {@link Table} implementation that exposes a table's partitions as rows.
@@ -30,6 +35,9 @@ import org.apache.iceberg.util.StructLikeWrapper;
public class PartitionsTable extends BaseMetadataTable {
private final Schema schema;
+ static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+ SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED,
true);
+ private static final String PARTITION_FIELD_PREFIX = "partition.";
PartitionsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".partitions");
@@ -63,9 +71,9 @@ public class PartitionsTable extends BaseMetadataTable {
return MetadataTableType.PARTITIONS;
}
- private DataTask task(TableScan scan) {
+ private DataTask task(StaticTableScan scan) {
TableOperations ops = operations();
- Iterable<Partition> partitions = partitions(table(),
scan.snapshot().snapshotId());
+ Iterable<Partition> partitions = partitions(scan);
if (table().spec().fields().size() < 1) {
// the table is unpartitioned, partitions contains only the root
partition
return
StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()),
partitions,
@@ -80,19 +88,60 @@ public class PartitionsTable extends BaseMetadataTable {
return StaticDataTask.Row.of(partition.key, partition.recordCount,
partition.fileCount);
}
- private static Iterable<Partition> partitions(Table table, Long snapshotId) {
- PartitionMap partitions = new PartitionMap(table.spec().partitionType());
- TableScan scan = table.newScan();
+ private static Iterable<Partition> partitions(StaticTableScan scan) {
+ CloseableIterable<FileScanTask> tasks = planFiles(scan);
- if (snapshotId != null) {
- scan = scan.useSnapshot(snapshotId);
+ PartitionMap partitions = new
PartitionMap(scan.table().spec().partitionType());
+ for (FileScanTask task : tasks) {
+ partitions.get(task.file().partition()).update(task.file());
}
+ return partitions.all();
+ }
- for (FileScanTask task : scan.planFiles()) {
- partitions.get(task.file().partition()).update(task.file());
+ @VisibleForTesting
+ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
+ Table table = scan.table();
+ Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId());
+ boolean caseSensitive = scan.isCaseSensitive();
+
+ // use an inclusive projection to remove the partition name prefix and
filter out any non-partition expressions
+ Expression partitionFilter = Projections
+ .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
+ .project(scan.filter());
+
+ ManifestGroup manifestGroup = new ManifestGroup(table.io(),
snapshot.dataManifests(), snapshot.deleteManifests())
+ .caseSensitive(caseSensitive)
+ .filterPartitions(partitionFilter)
+ .specsById(scan.table().specs())
+ .ignoreDeleted();
+
+ if (scan.shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
}
- return partitions.all();
+ if (PLAN_SCANS_WITH_WORKER_POOL && scan.snapshot().dataManifests().size()
> 1) {
+ manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
+ }
+
+ return manifestGroup.planFiles();
+ }
+
+ /**
+ * This method transforms the table's partition spec to a spec that is used
to rewrite the user-provided filter
+ * expression against the partitions table.
+ * <p>
+ * The resulting partition spec maps partition.X fields to partition X using
an identity partition transform. When
+ * this spec is used to project an expression for the partitions table, the
projection will remove predicates for
+ * non-partition fields (not in the spec) and will remove the "partition."
prefix from fields.
+ *
+ * @param partitionTableSchema schema of the partition table
+ * @param spec spec on which the partition table schema is based
+ * @return a spec used to rewrite partition table filters to partition
filters using an inclusive projection
+ */
+ private static PartitionSpec transformSpec(Schema partitionTableSchema,
PartitionSpec spec) {
+ PartitionSpec.Builder identitySpecBuilder =
PartitionSpec.builderFor(partitionTableSchema);
+ spec.fields().forEach(pf ->
identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name()));
+ return identitySpecBuilder.build();
}
private class PartitionsScan extends StaticTableScan {
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 4bb6f66..1e25922 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -106,6 +106,30 @@ public class TableTestBase {
.withPartitionPath("data_bucket=3") // easy way to set partition data
for now
.withRecordCount(1)
.build();
+ static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-0.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(0))
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_PARTITION_1 = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-1.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(1))
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_PARTITION_2 = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-2.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(2))
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_PARTITION_3 = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-3.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(TestHelpers.Row.of(3))
+ .withRecordCount(1)
+ .build();
static final FileIO FILE_IO = new TestTables.LocalFileIO();
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 711aaa7..a6a411e 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -20,9 +20,12 @@
package org.apache.iceberg;
import java.io.IOException;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -160,6 +163,194 @@ public class TestMetadataTableScans extends TableTestBase
{
}
@Test
+ public void testPartitionsTableScanNoFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(1, "partition", Types.StructType.of(
+ optional(1000, "data_bucket",
Types.IntegerType.get())))).asStruct();
+
+ TableScan scanNoFilter =
partitionsTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<FileScanTask> tasksNoFilter =
PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
+ Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
+ validateIncludesPartitionScan(tasksNoFilter, 0);
+ validateIncludesPartitionScan(tasksNoFilter, 1);
+ validateIncludesPartitionScan(tasksNoFilter, 2);
+ validateIncludesPartitionScan(tasksNoFilter, 3);
+ }
+
+ @Test
+ public void testPartitionsTableScanAndFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
+ CloseableIterable<FileScanTask> tasksAndEq =
PartitionsTable.planFiles((StaticTableScan) scanAndEq);
+ Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
+ validateIncludesPartitionScan(tasksAndEq, 0);
+ }
+
+ @Test
+ public void testPartitionsTableScanLtFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression ltAnd = Expressions.and(
+ Expressions.lessThan("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
+ CloseableIterable<FileScanTask> tasksLtAnd =
PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
+ Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
+ validateIncludesPartitionScan(tasksLtAnd, 0);
+ validateIncludesPartitionScan(tasksLtAnd, 1);
+ }
+
+ @Test
+ public void testPartitionsTableScanOrFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression or = Expressions.or(
+ Expressions.equal("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanOr = partitionsTable.newScan().filter(or);
+ CloseableIterable<FileScanTask> tasksOr =
PartitionsTable.planFiles((StaticTableScan) scanOr);
+ Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
+ validateIncludesPartitionScan(tasksOr, 0);
+ validateIncludesPartitionScan(tasksOr, 1);
+ validateIncludesPartitionScan(tasksOr, 2);
+ validateIncludesPartitionScan(tasksOr, 3);
+ }
+
+ @Test
+ public void testPartitionsScanNotFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression not =
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+ TableScan scanNot = partitionsTable.newScan().filter(not);
+ CloseableIterable<FileScanTask> tasksNot =
PartitionsTable.planFiles((StaticTableScan) scanNot);
+ Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
+ validateIncludesPartitionScan(tasksNot, 2);
+ validateIncludesPartitionScan(tasksNot, 3);
+ }
+
+ @Test
+ public void testPartitionsTableScanInFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression set = Expressions.in("partition.data_bucket", 2, 3);
+ TableScan scanSet = partitionsTable.newScan().filter(set);
+ CloseableIterable<FileScanTask> tasksSet =
PartitionsTable.planFiles((StaticTableScan) scanSet);
+ Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
+ validateIncludesPartitionScan(tasksSet, 2);
+ validateIncludesPartitionScan(tasksSet, 3);
+ }
+
+ @Test
+ public void testPartitionsTableScanNotNullFilter() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+ Expression unary = Expressions.notNull("partition.data_bucket");
+ TableScan scanUnary = partitionsTable.newScan().filter(unary);
+ CloseableIterable<FileScanTask> tasksUnary =
PartitionsTable.planFiles((StaticTableScan) scanUnary);
+ Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
+ validateIncludesPartitionScan(tasksUnary, 0);
+ validateIncludesPartitionScan(tasksUnary, 1);
+ validateIncludesPartitionScan(tasksUnary, 2);
+ validateIncludesPartitionScan(tasksUnary, 3);
+ }
+
+ @Test
public void testDataFilesTableSelection() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
@@ -194,4 +385,10 @@ public class TestMetadataTableScans extends TableTestBase {
}
}
}
+
+ private void validateIncludesPartitionScan(CloseableIterable<FileScanTask>
tasks, int partValue) {
+ Assert.assertTrue("File scan tasks do not include correct file",
+ StreamSupport.stream(tasks.spliterator(), false).anyMatch(
+ a -> a.file().partition().get(0, Object.class).equals(partValue)));
+ }
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index b3dcd1e..4244137 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -880,6 +880,25 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertEquals("Actual results should have one row", 1,
actualAfterFirstCommit.size());
TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(),
expected.get(0), actualAfterFirstCommit.get(0));
+
+ // check predicate push down
+ List<Row> filtered = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 1,
filtered.size());
+ TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(),
expected.get(0), filtered.get(0));
+
+ List<Row> nonFiltered = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2 or record_count=1")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 2,
nonFiltered.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(),
expected.get(i), actual.get(i));
+ }
}
@Test