jverhoeks opened a new issue, #2376:
URL: https://github.com/apache/iceberg-rust/issues/2376
### Is your feature request related to a problem or challenge?
`IcebergTableScan` (the DataFusion `ExecutionPlan` in
`crates/integrations/datafusion/src/physical_plan/scan.rs`) does not
participate in DataFusion's runtime / dynamic filter pushdown. Filters
generated by `HashJoinExec` build sides at execution time never reach the
Iceberg scan, so `lineitem`-style probe-side scans cannot prune row groups
using build-side join key sets. This leaves a real performance gap on TPC-H
joins.
#### Concrete observation
In TPC-H q14 (`lineitem JOIN part ON l_partkey = p_partkey`) at SF10, with
everything DataFusion offers turned on (`enable_dynamic_filter_pushdown =
true`, `parquet.bloom_filter_on_read = true`, `parquet.pushdown_filters =
true`, `parquet.reorder_filters = true`), `IcebergTableScan` still leaves the
full `lineitem` scan unpruned. The post-scan `FilterExec` evaluates the dynamic
filter row by row, which doesn't skip Parquet row groups.
For comparison, the same query on Trino's Iceberg connector finishes ~5x
faster on the same data because Trino's connector receives a `DynamicFilter`,
samples it once at split open, and feeds the result into Parquet row-group
statistics + bloom filters before any data is read.
#### Why the existing API can't carry this
`IcebergTableScan::new` takes `filters: &[Expr]` once and converts them via
`convert_filters_to_predicate`. `TableScanBuilder::with_filter` accepts a
single `Predicate`. Both fire at scan construction time. There is no surface
for a predicate that only becomes useful mid-execution (when the join build
side completes).
Internally the reader is already capable of applying the predicate at all
three pruning levels (see `crates/iceberg/src/arrow/reader.rs`):
```rust
// reader.rs:398-444 — when final_predicate is Some:
// 1. with_row_filter(row_filter) // post-decode RowFilter
// 2. get_selected_row_group_indices(...) // row-group min/max skip
// 3. get_row_selection_for_filter_predicate // page-index row selection
```
So the missing piece is purely the entry point — there is no way to feed the
latest dynamic predicate to `process_file_scan_task` once the scan is in flight.
### Describe the solution you'd like
A Trino-style "sample once per file scan task" hook. Sketch:
```rust
/// User-supplied factory consulted right before each FileScanTask
/// opens its Parquet file. Returns None when the dynamic source is
/// not yet useful (e.g. join build side has not completed).
pub trait DynamicPredicate: Send + Sync {
fn current(&self) -> Option<Predicate>;
}
impl TableScanBuilder {
pub fn with_dynamic_predicate(self, dp: Arc<dyn DynamicPredicate>) ->
Self;
}
```
In `ArrowReader::process_file_scan_task`, before the existing
`final_predicate` block, intersect the static predicate with
`dynamic_predicate.current()`. The combined predicate flows into the three
pruning paths the reader already implements. No reader-side changes beyond the
intersection.
`IcebergTableScan` (or any DataFusion ExecutionPlan that owns DF runtime
filters) wraps `Vec<Arc<dyn PhysicalExpr>>` as a `DynamicPredicate` impl that
translates each filter to `Predicate` via the existing `expr_to_predicate`
paths and returns `None` when no filter is yet selective.
### Reference implementations elsewhere
| | Sample point | What it prunes |
|---|---|---|
| **Trino** (`IcebergPageSourceProvider#createPageSource`) | once per split
via `DynamicFilter.getCurrentPredicate()` | file-level + row-group + Parquet
bloom |
| **Spark** (`SupportsRuntimeV2Filtering#filter(Predicate[])`) | once per
partition task, can re-plan | partition-task pruning |
| **iceberg-rust** today | n/a | static only |
The Trino model fits iceberg-rust's reader cleanly because the reader
already has the static-predicate plumbing.
### Willingness to contribute
Yes — I have a working prototype on a downstream fork that overrides
`gather_filters_for_pushdown` / `handle_child_pushdown_result` on
`IcebergTableScan` and applies runtime filters per-batch (post-decode). It
produces a real win at SF1 (-21% on TPC-H) and SF10 (-9.4%), but it doesn't
reach scan-time pruning since iceberg-rust's TableScan API is single-shot. The
follow-up is the `DynamicPredicate` API above; happy to draft a PR if there's
interest.
Related issue: #2363 (static `Inexact` -> `Exact` pushdown) — addresses a
different bug but cleans up the same pushdown surface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]