adriangb opened a new pull request, #22384:
URL: https://github.com/apache/datafusion/pull/22384

   ## Which issue does this PR close?
   
   - Closes #.
   
   ## Rationale for this change
   
   The Parquet scan today gives the predicate to the source for row-group / page
   / bloom pruning, but only applies it row-level via `RowFilter` when
   `pushdown_filters=true`. With pushdown off, a `FilterExec` is left above the
   scan to do row-level filtering. This is the substrate the adaptive-filter
   work (#22237 / #22144) builds on, but it also has a real **correctness bug
   on `main`** that's worth fixing on its own.
   
   `build_row_filter` (`row_filter.rs:994-1083`, see its own doc comment at
   `1009-1014`) silently drops conjuncts that `FilterCandidateBuilder::build`
   returns `Ok(None)` for, and `RowFilterGenerator::build` swallows whole-build
   errors. By the time `build_row_filter` runs, 
`ParquetSource::try_pushdown_filters`
   has already accepted the filter and the parent `FilterExec` has been removed
   — so those dropped conjuncts are never applied anywhere and the query
   returns wrong results. The most reproducible trigger is the per-file expr
   adapter rewriting a predicate that was pushable at *table schema* time into
   something `PushdownChecker` rejects at *physical file schema* time (schema
   evolution / coercion, whole-struct refs introduced by the rewrite, etc.).
   
   This PR makes the Parquet scan *always* own its pushable filters and
   guarantees every accepted conjunct is applied — either by the parquet
   `RowFilter` or by a new in-scan **post-scan filter** evaluated on decoded
   batches (the in-scan equivalent of a `FilterExec`). Nothing is silently
   dropped.
   
   ## What changes are included in this PR?
   
   - **`row_filter.rs` — never drop conjuncts.** `build_row_filter` now returns
     `Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>` — the second
     element is the conjuncts it could not place. `RowFilterGenerator` exposes
     them via `rejected_conjuncts()`; on whole-file build errors it routes
     every conjunct through that list (no silent error swallowing).
   - **`post_scan_filter.rs` (new module)** — encapsulates the projection
     widening + rebasing + filter evaluation behind a small API:
     - `PostScanFilter` — evaluates a predicate on decoded batches; SQL `WHERE`
       semantics (`NULL` drops the row); records rows-pruned / matched / time.
     - `DecoderProjection::build(projection, post_scan_conjuncts, schemas, …)`
       — widens the decoder projection over (user projection ∪ post-scan
       conjunct columns), rebases the projection and conjuncts onto the
       decoder's stream schema, and returns the `ProjectionMask`, `Projector`,
       `replace_schema` flag, and the rebased `PostScanFilter`. Empty conjuncts
       list = the prior projection-only behaviour, so the opener routes every
       file through this one call.
   - **`ParquetSource::try_pushdown_filters`** — always returns the per-filter
     `Yes`/`No` discriminant based on `can_expr_be_pushed_down_with_schemas`,
     regardless of the `pushdown_filters` config. The flag still records whether
     the `RowFilter` (vs. post-scan) path is used downstream.
   - **`opener/mod.rs::build_stream`** — orchestrates: builds the
     `RowFilterGenerator` only when `pushdown_filters=true`; computes
     `post_scan_conjuncts` (rejected conjuncts when pushdown is on, full
     split-conjunction of the predicate when off); calls 
`DecoderProjection::build`;
     routes the `LIMIT` to `remaining_limit` instead of a decoder limit whenever
     the post-scan filter is present (decoder-local limit + post-scan filter is
     unsafe — the decoder would stop before the post-scan rejected enough rows).
     The prior inline `build_projection_read_plan` / `reassign_expr_columns` /
     `make_projector` block is replaced by the single `DecoderProjection::build`
     call — net simplification.
   - **`push_decoder.rs`** — `PushDecoderStreamState` carries an
     `Option<PostScanFilter>`; in the `DecodeResult::Data` arm it applies the
     filter, skips empty batches, then enforces `remaining_limit` and projects.
     `DecoderBuilderConfig` is fed `projection_mask: &ProjectionMask` directly
     (no longer the full `ParquetReadPlan`).
   - **`metrics.rs`** — new `post_scan_rows_pruned` / `post_scan_rows_matched`
     counters and `post_scan_filter_eval_time` `Time`, mirroring the existing
     `pushdown_rows_*` / `row_pushdown_eval_time` so `EXPLAIN ANALYZE` keeps
     surfacing filter cost once the `FilterExec` is gone.
   
   The adaptive-filter machinery from #22237 / #22144 (`SelectivityTracker`,
   `FilterId` tagging, per-conjunct pruning stats, `StrategySwap` mid-stream
   swaps, `OptionalFilterPhysicalExpr`, custom `arrow-rs` branch, the three
   `filter_pushdown_*` config knobs) is intentionally **not** included — this
   PR is the standalone substrate they would build on.
   
   ## Are these changes tested?
   
   Yes.
   
   - Two new regression tests for the drop-on-floor bug:
     - `build_row_filter_surfaces_rejected_struct_conjunct` (`row_filter.rs`)
       asserts the new API contract directly — `build_row_filter` no longer
       drops the rejected conjunct.
     - `rejected_struct_conjunct_runs_post_scan_not_dropped` (`opener/mod.rs`)
       is an end-to-end test: with `pushdown_filters=true` and a `s IS NOT NULL`
       predicate over a struct column where row 1 is NULL, `main` returns 3
       rows (conjunct silently dropped, predicate relaxed) and this PR returns
       the correct 2.
   - Existing parquet integration tests, opener unit tests, physical-optimizer
     filter-pushdown tests, and sqllogictest all pass.
   - A handful of in-tree tests that asserted the *old* "scan only does stats
     pruning" behaviour (e.g. \`a = 1\` over data \`[1, 2, 3]\` should still
     return 3 rows because the row group wasn't stats-pruned) are updated to
     reflect the new behaviour — the scan now applies the predicate row-level
     via the post-scan filter, so they return only the matching row.
   - Parquet-related explain `.slt` files are regenerated (clickbench,
     push_down_filter_parquet, projection_pushdown, parquet*, etc.) — the
     `FilterExec` above parquet scans is gone from those plans. Spurious
     whitespace-only churn from `--complete` was reverted.
   
   ## Are there any user-facing changes?
   
   - **Bug fix**: predicates with non-row-filterable conjuncts (e.g. 
whole-struct
     references, certain schema-evolution edge cases) now return correct
     results with `pushdown_filters=true`. Before this PR they were silently
     relaxed.
   - **Plan shape**: `FilterExec` no longer appears above a `DataSourceExec` for
     pushable filters on a parquet source. The predicate appears as 
`predicate=…`
     on the `DataSourceExec`. Query results are unchanged.
   - **Metrics**: three new metrics on `ParquetFileMetrics` —
     `post_scan_rows_pruned`, `post_scan_rows_matched`, 
`post_scan_filter_eval_time`
     — appear in `EXPLAIN ANALYZE` output for parquet scans.
   - **Public API**: `build_row_filter` return type changes from
     `Result<Option<RowFilter>>` to
     `Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>`; callers must
     apply the rejected conjuncts (or they'll have the same drop-on-floor bug
     this PR fixes).
   
   ---
   
   Draft. Happy to split into a stack (refactor → row_filter fix → opener
   orchestration + tests) if reviewers prefer.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to