This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d5c498  Core: Add predicate push down for partitions metadata table 
(#2358)
8d5c498 is described below

commit 8d5c4986e9a93bd77d8d3ef851ea9aed825e85b2
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Jun 22 13:16:45 2021 -0700

    Core: Add predicate push down for partitions metadata table (#2358)
    
    Co-authored-by: Szehon Ho <[email protected]>
---
 .../java/org/apache/iceberg/PartitionsTable.java   |  69 ++++++--
 .../java/org/apache/iceberg/TableTestBase.java     |  24 +++
 .../org/apache/iceberg/TestMetadataTableScans.java | 197 +++++++++++++++++++++
 .../spark/source/TestIcebergSourceTablesBase.java  |  19 ++
 4 files changed, 299 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index c083bb5..e190ca4 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -20,9 +20,14 @@
 package org.apache.iceberg;
 
 import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.ThreadPools;
 
 /**
  * A {@link Table} implementation that exposes a table's partitions as rows.
@@ -30,6 +35,9 @@ import org.apache.iceberg.util.StructLikeWrapper;
 public class PartitionsTable extends BaseMetadataTable {
 
   private final Schema schema;
+  static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, 
true);
+  private static final String PARTITION_FIELD_PREFIX = "partition.";
 
   PartitionsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".partitions");
@@ -63,9 +71,9 @@ public class PartitionsTable extends BaseMetadataTable {
     return MetadataTableType.PARTITIONS;
   }
 
-  private DataTask task(TableScan scan) {
+  private DataTask task(StaticTableScan scan) {
     TableOperations ops = operations();
-    Iterable<Partition> partitions = partitions(table(), 
scan.snapshot().snapshotId());
+    Iterable<Partition> partitions = partitions(scan);
     if (table().spec().fields().size() < 1) {
       // the table is unpartitioned, partitions contains only the root 
partition
       return 
StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), 
partitions,
@@ -80,19 +88,60 @@ public class PartitionsTable extends BaseMetadataTable {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, 
partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private static Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planFiles(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new 
PartitionMap(scan.table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId());
+    boolean caseSensitive = scan.isCaseSensitive();
+
+    // use an inclusive projection to remove the partition name prefix and 
filter out any non-partition expressions
+    Expression partitionFilter = Projections
+        .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
+        .project(scan.filter());
+
+    ManifestGroup manifestGroup = new ManifestGroup(table.io(), 
snapshot.dataManifests(), snapshot.deleteManifests())
+        .caseSensitive(caseSensitive)
+        .filterPartitions(partitionFilter)
+        .specsById(scan.table().specs())
+        .ignoreDeleted();
+
+    if (scan.shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    return partitions.all();
+    if (PLAN_SCANS_WITH_WORKER_POOL && scan.snapshot().dataManifests().size() 
> 1) {
+      manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
+    }
+
+    return manifestGroup.planFiles();
+  }
+
+  /**
+   * This method transforms the table's partition spec to a spec that is used 
to rewrite the user-provided filter
+   * expression against the partitions table.
+   * <p>
+   * The resulting partition spec maps partition.X fields to partition X using 
an identity partition transform. When
+   * this spec is used to project an expression for the partitions table, the 
projection will remove predicates for
+   * non-partition fields (not in the spec) and will remove the "partition." 
prefix from fields.
+   *
+   * @param partitionTableSchema schema of the partition table
+   * @param spec spec on which the partition table schema is based
+   * @return a spec used to rewrite partition table filters to partition 
filters using an inclusive projection
+   */
+  private static PartitionSpec transformSpec(Schema partitionTableSchema, 
PartitionSpec spec) {
+    PartitionSpec.Builder identitySpecBuilder = 
PartitionSpec.builderFor(partitionTableSchema);
+    spec.fields().forEach(pf -> 
identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name()));
+    return identitySpecBuilder.build();
   }
 
   private class PartitionsScan extends StaticTableScan {
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 4bb6f66..1e25922 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -106,6 +106,30 @@ public class TableTestBase {
       .withPartitionPath("data_bucket=3") // easy way to set partition data 
for now
       .withRecordCount(1)
       .build();
+  static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-0.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(TestHelpers.Row.of(0))
+      .withRecordCount(1)
+      .build();
+  static final DataFile FILE_PARTITION_1 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-1.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(TestHelpers.Row.of(1))
+      .withRecordCount(1)
+      .build();
+  static final DataFile FILE_PARTITION_2 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-2.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(TestHelpers.Row.of(2))
+      .withRecordCount(1)
+      .build();
+  static final DataFile FILE_PARTITION_3 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-3.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(TestHelpers.Row.of(3))
+      .withRecordCount(1)
+      .build();
 
   static final FileIO FILE_IO = new TestTables.LocalFileIO();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 711aaa7..a6a411e 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -20,9 +20,12 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -160,6 +163,194 @@ public class TestMetadataTableScans extends TableTestBase 
{
   }
 
   @Test
+  public void testPartitionsTableScanNoFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(1, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", 
Types.IntegerType.get())))).asStruct();
+
+    TableScan scanNoFilter = 
partitionsTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<FileScanTask> tasksNoFilter = 
PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
+    validateIncludesPartitionScan(tasksNoFilter, 0);
+    validateIncludesPartitionScan(tasksNoFilter, 1);
+    validateIncludesPartitionScan(tasksNoFilter, 2);
+    validateIncludesPartitionScan(tasksNoFilter, 3);
+  }
+
+  @Test
+  public void testPartitionsTableScanAndFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
+    CloseableIterable<FileScanTask> tasksAndEq = 
PartitionsTable.planFiles((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
+    validateIncludesPartitionScan(tasksAndEq, 0);
+  }
+
+  @Test
+  public void testPartitionsTableScanLtFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression ltAnd = Expressions.and(
+        Expressions.lessThan("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
+    CloseableIterable<FileScanTask> tasksLtAnd = 
PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
+    Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
+    validateIncludesPartitionScan(tasksLtAnd, 0);
+    validateIncludesPartitionScan(tasksLtAnd, 1);
+  }
+
+  @Test
+  public void testPartitionsTableScanOrFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanOr = partitionsTable.newScan().filter(or);
+    CloseableIterable<FileScanTask> tasksOr = 
PartitionsTable.planFiles((StaticTableScan) scanOr);
+    Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
+    validateIncludesPartitionScan(tasksOr, 0);
+    validateIncludesPartitionScan(tasksOr, 1);
+    validateIncludesPartitionScan(tasksOr, 2);
+    validateIncludesPartitionScan(tasksOr, 3);
+  }
+
+  @Test
+  public void testPartitionsScanNotFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression not = 
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scanNot = partitionsTable.newScan().filter(not);
+    CloseableIterable<FileScanTask> tasksNot = 
PartitionsTable.planFiles((StaticTableScan) scanNot);
+    Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
+    validateIncludesPartitionScan(tasksNot, 2);
+    validateIncludesPartitionScan(tasksNot, 3);
+  }
+
+  @Test
+  public void testPartitionsTableScanInFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression set = Expressions.in("partition.data_bucket", 2, 3);
+    TableScan scanSet = partitionsTable.newScan().filter(set);
+    CloseableIterable<FileScanTask> tasksSet = 
PartitionsTable.planFiles((StaticTableScan) scanSet);
+    Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
+    validateIncludesPartitionScan(tasksSet, 2);
+    validateIncludesPartitionScan(tasksSet, 3);
+  }
+
+  @Test
+  public void testPartitionsTableScanNotNullFilter() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+
+    Expression unary = Expressions.notNull("partition.data_bucket");
+    TableScan scanUnary = partitionsTable.newScan().filter(unary);
+    CloseableIterable<FileScanTask> tasksUnary = 
PartitionsTable.planFiles((StaticTableScan) scanUnary);
+    Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
+    validateIncludesPartitionScan(tasksUnary, 0);
+    validateIncludesPartitionScan(tasksUnary, 1);
+    validateIncludesPartitionScan(tasksUnary, 2);
+    validateIncludesPartitionScan(tasksUnary, 3);
+  }
+
+  @Test
   public void testDataFilesTableSelection() throws IOException {
     table.newFastAppend()
         .appendFile(FILE_A)
@@ -194,4 +385,10 @@ public class TestMetadataTableScans extends TableTestBase {
       }
     }
   }
+
+  private void validateIncludesPartitionScan(CloseableIterable<FileScanTask> 
tasks, int partValue) {
+    Assert.assertTrue("File scan tasks do not include correct file",
+        StreamSupport.stream(tasks.spliterator(), false).anyMatch(
+            a -> a.file().partition().get(0, Object.class).equals(partValue)));
+  }
 }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index b3dcd1e..4244137 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -880,6 +880,25 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertEquals("Actual results should have one row", 1, 
actualAfterFirstCommit.size());
     TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), 
expected.get(0), actualAfterFirstCommit.get(0));
+
+    // check predicate push down
+    List<Row> filtered = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "partitions"))
+        .filter("partition.id < 2")
+        .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 1, 
filtered.size());
+    TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), 
expected.get(0), filtered.get(0));
+
+    List<Row> nonFiltered = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "partitions"))
+        .filter("partition.id < 2 or record_count=1")
+        .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 2, 
nonFiltered.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), 
expected.get(i), actual.get(i));
+    }
   }
 
   @Test

Reply via email to