adriangb opened a new pull request, #22384:
URL: https://github.com/apache/datafusion/pull/22384
## Which issue does this PR close?
- Closes #.
## Rationale for this change
The Parquet scan today gives the predicate to the source for row-group / page
/ bloom pruning, but only applies it row-level via `RowFilter` when
`pushdown_filters=true`. With pushdown off, a `FilterExec` is left above the
scan to do row-level filtering. This is the substrate the adaptive-filter
work (#22237 / #22144) builds on, but it also has a real **correctness bug
on `main`** that's worth fixing on its own.
`build_row_filter` (`row_filter.rs:994-1083`, see its own doc comment at
`1009-1014`) silently drops conjuncts that `FilterCandidateBuilder::build`
returns `Ok(None)` for, and `RowFilterGenerator::build` swallows whole-build
errors. By the time `build_row_filter` runs,
`ParquetSource::try_pushdown_filters`
has already accepted the filter and the parent `FilterExec` has been removed
— so those dropped conjuncts are never applied anywhere and the query
returns wrong results. The most reproducible trigger is the per-file expr
adapter rewriting a predicate that was pushable at *table schema* time into
something `PushdownChecker` rejects at *physical file schema* time (schema
evolution / coercion, whole-struct refs introduced by the rewrite, etc.).
This PR makes the Parquet scan *always* own its pushable filters and
guarantees every accepted conjunct is applied — either by the parquet
`RowFilter` or by a new in-scan **post-scan filter** evaluated on decoded
batches (the in-scan equivalent of a `FilterExec`). Nothing is silently
dropped.
## What changes are included in this PR?
- **`row_filter.rs` — never drop conjuncts.** `build_row_filter` now returns
`Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>` — the second
element is the conjuncts it could not place. `RowFilterGenerator` exposes
them via `rejected_conjuncts()`; on whole-file build errors it routes
every conjunct through that list (no silent error swallowing).
- **`post_scan_filter.rs` (new module)** — encapsulates the projection
widening + rebasing + filter evaluation behind a small API:
- `PostScanFilter` — evaluates a predicate on decoded batches; SQL `WHERE`
semantics (`NULL` drops the row); records rows-pruned / matched / time.
- `DecoderProjection::build(projection, post_scan_conjuncts, schemas, …)`
— widens the decoder projection over (user projection ∪ post-scan
conjunct columns), rebases the projection and conjuncts onto the
decoder's stream schema, and returns the `ProjectionMask`, `Projector`,
`replace_schema` flag, and the rebased `PostScanFilter`. Empty conjuncts
list = the prior projection-only behaviour, so the opener routes every
file through this one call.
- **`ParquetSource::try_pushdown_filters`** — always returns the per-filter
`Yes`/`No` discriminant based on `can_expr_be_pushed_down_with_schemas`,
regardless of the `pushdown_filters` config. The flag still records whether
the `RowFilter` (vs. post-scan) path is used downstream.
- **`opener/mod.rs::build_stream`** — orchestrates: builds the
`RowFilterGenerator` only when `pushdown_filters=true`; computes
`post_scan_conjuncts` (rejected conjuncts when pushdown is on, full
split-conjunction of the predicate when off); calls
`DecoderProjection::build`;
routes the `LIMIT` to `remaining_limit` instead of a decoder limit whenever
the post-scan filter is present (decoder-local limit + post-scan filter is
unsafe — the decoder would stop before the post-scan rejected enough rows).
The prior inline `build_projection_read_plan` / `reassign_expr_columns` /
`make_projector` block is replaced by the single `DecoderProjection::build`
call — net simplification.
- **`push_decoder.rs`** — `PushDecoderStreamState` carries an
`Option<PostScanFilter>`; in the `DecodeResult::Data` arm it applies the
filter, skips empty batches, then enforces `remaining_limit` and projects.
`DecoderBuilderConfig` is fed `projection_mask: &ProjectionMask` directly
(no longer the full `ParquetReadPlan`).
- **`metrics.rs`** — new `post_scan_rows_pruned` / `post_scan_rows_matched`
counters and `post_scan_filter_eval_time` `Time`, mirroring the existing
`pushdown_rows_*` / `row_pushdown_eval_time` so `EXPLAIN ANALYZE` keeps
surfacing filter cost once the `FilterExec` is gone.
The adaptive-filter machinery from #22237 / #22144 (`SelectivityTracker`,
`FilterId` tagging, per-conjunct pruning stats, `StrategySwap` mid-stream
swaps, `OptionalFilterPhysicalExpr`, custom `arrow-rs` branch, the three
`filter_pushdown_*` config knobs) is intentionally **not** included — this
PR is the standalone substrate they would build on.
## Are these changes tested?
Yes.
- Two new regression tests for the drop-on-floor bug:
- `build_row_filter_surfaces_rejected_struct_conjunct` (`row_filter.rs`)
asserts the new API contract directly — `build_row_filter` no longer
drops the rejected conjunct.
- `rejected_struct_conjunct_runs_post_scan_not_dropped` (`opener/mod.rs`)
is an end-to-end test: with `pushdown_filters=true` and a `s IS NOT NULL`
predicate over a struct column where row 1 is NULL, `main` returns 3
rows (conjunct silently dropped, predicate relaxed) and this PR returns
the correct 2.
- Existing parquet integration tests, opener unit tests, physical-optimizer
filter-pushdown tests, and sqllogictest all pass.
- A handful of in-tree tests that asserted the *old* "scan only does stats
pruning" behaviour (e.g. \`a = 1\` over data \`[1, 2, 3]\` should still
return 3 rows because the row group wasn't stats-pruned) are updated to
reflect the new behaviour — the scan now applies the predicate row-level
via the post-scan filter, so they return only the matching row.
- Parquet-related explain `.slt` files are regenerated (clickbench,
push_down_filter_parquet, projection_pushdown, parquet*, etc.) — the
`FilterExec` above parquet scans is gone from those plans. Spurious
whitespace-only churn from `--complete` was reverted.
## Are there any user-facing changes?
- **Bug fix**: predicates with non-row-filterable conjuncts (e.g.
whole-struct
references, certain schema-evolution edge cases) now return correct
results with `pushdown_filters=true`. Before this PR they were silently
relaxed.
- **Plan shape**: `FilterExec` no longer appears above a `DataSourceExec` for
pushable filters on a parquet source. The predicate appears as
`predicate=…`
on the `DataSourceExec`. Query results are unchanged.
- **Metrics**: three new metrics on `ParquetFileMetrics` —
`post_scan_rows_pruned`, `post_scan_rows_matched`,
`post_scan_filter_eval_time`
— appear in `EXPLAIN ANALYZE` output for parquet scans.
- **Public API**: `build_row_filter` return type changes from
`Result<Option<RowFilter>>` to
`Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>`; callers must
apply the rejected conjuncts (or they'll have the same drop-on-floor bug
this PR fixes).
---
Draft. Happy to split into a stack (refactor → row_filter fix → opener
orchestration + tests) if reviewers prefer.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]