rdblue commented on a change in pull request #3400:
URL: https://github.com/apache/iceberg/pull/3400#discussion_r739887806



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
##########
@@ -74,60 +101,158 @@
       throw new IllegalArgumentException("Cannot only specify option 
end-snapshot-id to do incremental scan");
     }
 
-    // look for split behavior overrides in options
-    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null);
-    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null);
-    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SparkReadOptions.FILE_OPEN_COST, null);
+    this.splitSize = table instanceof BaseMetadataTable ? 
readConf.metadataSplitSize() : readConf.splitSize();
+    this.splitLookback = readConf.splitLookback();
+    this.splitOpenFileCost = readConf.splitOpenFileCost();
+    this.runtimeFilterExpressions = Lists.newArrayList();
+  }
+
+  private Set<Integer> specIds() {
+    if (specIds == null) {
+      Set<Integer> specIdSet = Sets.newHashSet();
+      for (FileScanTask file : files()) {
+        specIdSet.add(file.spec().specId());
+      }
+      this.specIds = specIdSet;
+    }
+
+    return specIds;
+  }
+
+  private List<FileScanTask> files() {
+    if (files == null) {
+      this.files = planFiles();
+    }
+
+    return files;
+  }
+
+  private List<FileScanTask> planFiles() {
+    TableScan scan = table()
+        .newScan()
+        .option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize))
+        .option(TableProperties.SPLIT_LOOKBACK, String.valueOf(splitLookback))
+        .option(TableProperties.SPLIT_OPEN_FILE_COST, 
String.valueOf(splitOpenFileCost))
+        .caseSensitive(caseSensitive())
+        .project(expectedSchema());
+
+    if (snapshotId != null) {
+      scan = scan.useSnapshot(snapshotId);
+    }
+
+    if (asOfTimestamp != null) {
+      scan = scan.asOfTime(asOfTimestamp);
+    }
+
+    if (startSnapshotId != null) {
+      if (endSnapshotId != null) {
+        scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+      } else {
+        scan = scan.appendsAfter(startSnapshotId);
+      }
+    }
+
+    for (Expression filter : filterExpressions()) {
+      scan = scan.filter(filter);
+    }
+
+    try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) {
+      return Lists.newArrayList(filesIterable);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close table scan: " + scan, e);
+    }
   }
 
   @Override
   protected List<CombinedScanTask> tasks() {
     if (tasks == null) {
-      TableScan scan = table()
-          .newScan()
-          .caseSensitive(caseSensitive())
-          .project(expectedSchema());
+      CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(
+          CloseableIterable.withNoopClose(files()),
+          splitSize);
+      CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks(
+          splitFiles, splitSize,
+          splitLookback, splitOpenFileCost);
+      tasks = Lists.newArrayList(scanTasks);
+    }
 
-      if (snapshotId != null) {
-        scan = scan.useSnapshot(snapshotId);
-      }
+    return tasks;
+  }
 
-      if (asOfTimestamp != null) {
-        scan = scan.asOfTime(asOfTimestamp);
-      }
+  @Override
+  public NamedReference[] filterAttributes() {
+    Set<Integer> partitionFieldSourceIds = Sets.newHashSet();
 
-      if (startSnapshotId != null) {
-        if (endSnapshotId != null) {
-          scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
-        } else {
-          scan = scan.appendsAfter(startSnapshotId);
-        }
+    for (Integer specId : specIds()) {
+      PartitionSpec spec = table().specs().get(specId);
+      for (PartitionField field : spec.fields()) {
+        partitionFieldSourceIds.add(field.sourceId());
       }
+    }
 
-      if (splitSize != null) {
-        scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
-      }
+    Map<Integer, String> quotedNameById = TypeUtil.indexQuotedNameById(
+        expectedSchema().asStruct(),
+        name -> String.format("`%s`", name.replace("`", "``")));
 
-      if (splitLookback != null) {
-        scan = scan.option(TableProperties.SPLIT_LOOKBACK, 
splitLookback.toString());
-      }
+    return partitionFieldSourceIds.stream()
+        .filter(fieldId -> expectedSchema().findField(fieldId) != null)
+        .map(fieldId -> 
Spark3Util.toNamedReference(quotedNameById.get(fieldId)))
+        .toArray(NamedReference[]::new);
+  }
 
-      if (splitOpenFileCost != null) {
-        scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
splitOpenFileCost.toString());
+  @Override
+  public void filter(Filter[] filters) {
+    Expression runtimeFilterExpr = convertRuntimeFilters(filters);
+
+    if (runtimeFilterExpr != Expressions.alwaysTrue()) {
+      Map<Integer, Evaluator> evaluatorsBySpecId = Maps.newHashMap();
+
+      for (Integer specId : specIds()) {
+        PartitionSpec spec = table().specs().get(specId);
+        Expression inclusiveExpr = 
Projections.inclusive(spec).project(runtimeFilterExpr);
+        Evaluator inclusive = new Evaluator(spec.partitionType(), 
inclusiveExpr);
+        evaluatorsBySpecId.put(specId, inclusive);
       }
 
-      for (Expression filter : filterExpressions()) {
-        scan = scan.filter(filter);
+      LOG.info("Trying to filter {} files using runtime filter {}", 
files().size(), runtimeFilterExpr);
+
+      List<FileScanTask> filteredFiles = files().stream()
+          .filter(file -> {
+            Evaluator evaluator = evaluatorsBySpecId.get(file.spec().specId());
+            return evaluator.eval(file.file().partition());
+          })
+          .collect(Collectors.toList());
+
+      LOG.info("{}/{} files matched runtime filter {}", filteredFiles.size(), 
files().size(), runtimeFilterExpr);
+
+      // don't invalidate tasks if the runtime filter had no effect to avoid 
planning splits again
+      if (filteredFiles.size() < files().size()) {
+        this.specIds = null;
+        this.files = filteredFiles;
+        this.tasks = null;
       }
 
-      try (CloseableIterable<CombinedScanTask> tasksIterable = 
scan.planTasks()) {
-        this.tasks = Lists.newArrayList(tasksIterable);
-      }  catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to close table scan: %s", 
scan);
+      runtimeFilterExpressions.add(runtimeFilterExpr);
+    }
+  }
+
+  private Expression convertRuntimeFilters(Filter[] filters) {

Review comment:
       Side question: do we need to do any analysis of the incoming filters? 
For example, if there is one ID field will the filter be a series of ANDed 
equality predicates that we can convert to an IN?
   
   We also don't currently support an IN predicate of StructLike values, but we 
could implement that if it is something we expect. Maybe we should document 
what the expected filters look like in a comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to