adriangb opened a new pull request, #22156:
URL: https://github.com/apache/datafusion/pull/22156
## Which issue does this PR close?
Relates to the discussion in #22024 about the Parquet opener becoming a mini
planner. Does not close that issue — this is the first in a sequence of
refactors.
## Rationale for this change
The Parquet opener is on a path to becoming a tangled state machine: today
it already has a fixed sequence of pruning passes (file-range, row-group
statistics, bloom filter, limit, page index), and proposed work (#22024
sampling, #21637 fully-matched-row-groups, #22144 adaptive filter pushdown) is
poised to add several more — each one editing the opener inline.
Two cheap moves that buy a lot:
1. **Split `opener.rs` into a module.** The file is ~2,700 LOC on main, and
several pieces in it (`EarlyStoppingStream`, `EncryptionContext`) have nothing
to do with the state machine — they're self-contained types sharing a file with
their caller. Moving them to siblings under `opener/` drops the cognitive cost
of reading the state machine without changing any public API or behavior. This
is pure code motion.
2. **Add a `ParquetAccessPlanOptimizer` trait** that lets external crates
plug additional pruning passes into the opener pipeline. This is the seam
@alamb suggested in #22024: sampling, custom statistics, user-defined Parquet
indexes, etc. can ship outside the parquet-datasource crate without forking the
opener.
These are the minimum changes that:
- isolate code that doesn't belong in `opener.rs`, and
- expose a stable extension point so future pruning strategies don't all
need opener edits.
## What changes are included in this PR?
**Module split:**
- `opener.rs` → `opener/mod.rs`
- New `opener/early_stop.rs` — `EarlyStoppingStream` (~100 LOC,
self-contained, used only by `build_stream`).
- New `opener/encryption.rs` — `EncryptionContext` and the
`ParquetMorselizer::get_encryption_context` helpers. Isolates the
`#[cfg(feature = \"parquet_encryption\")]` gating that previously bled through
the main file.
**Extension trait** (new `access_plan_optimizer.rs`):
- `ParquetAccessPlanOptimizer` trait — `fn stage()` returns an
`OptimizerStage`; `fn optimize(&self, ctx, plan) -> Result<plan>` refines a
`ParquetAccessPlan`.
- `OptimizerStage` enum: `AfterMetadata`, `AfterBloomFilters`,
`BeforeBuildStream`. Each stage runs after the built-in pruning pass at the
corresponding point in the file-open state machine.
- `AccessPlanContext<'a>` — read-only context the optimizer sees: partition
file, predicate, pruning predicates, parquet metadata, schemas, limit, metrics,
current stage.
- `ParquetSource::with_access_plan_optimizer(Arc<dyn
ParquetAccessPlanOptimizer>)` to register an optimizer. Multiple can be
registered; they run in registration order at their declared stage.
**State machine integration:**
- `prune_row_groups` (AfterMetadata) and `prune_bloom_filters`
(AfterBloomFilters) run user optimizers via a `run_user_optimizers_on_rgapf`
helper that round-trips through `ParquetAccessPlan` (no-op when no user
optimizers are registered).
- `build_stream` (BeforeBuildStream) invokes user optimizers directly on the
`ParquetAccessPlan` after limit and page-index pruning.
### Deliberately not in this PR
- **Migrating built-in pruning passes to the trait.** The built-ins share an
`is_fully_matched: Vec<bool>` side-state (written by `prune_by_statistics`,
read by `prune_by_limit`) that the trait surface doesn't model. Migrating them
now would either require plumbing that state through the trait or losing the
limit-via-fully-matched optimization. Tracking as a follow-up.
- **Bloom filters in the public context.** `BloomFilterStatistics` is
currently `pub(crate)`. The context exposes everything else for
`AfterBloomFilters`-stage optimizers but not the bloom-filter slice itself.
Easy to add if a concrete external use case appears.
- **PR #22024 sampling**, **PR #21637 fully-matched**, **PR #22144 adaptive
pushdown** — those land on this seam separately.
## Are these changes tested?
Yes:
- All 100 existing `datafusion-datasource-parquet` unit tests pass.
- New \`test_user_access_plan_optimizer_runs_before_build_stream\` test:
writes a 100-row parquet file as two row groups of 50 rows, registers an
optimizer that skips row group 0, asserts the scan returns 50 rows (vs. 100
without the optimizer). Validates that the registration → state-machine →
reader path actually honors the user-supplied plan.
- \`cargo fmt --all\`, \`./dev/rust_lint.sh\`, \`cargo clippy -p
datafusion-datasource-parquet --all-targets --all-features -- -D warnings\` all
pass.
- Downstream \`datafusion\` core builds clean.
## Are there any user-facing changes?
**New public Rust API:**
- \`datafusion_datasource_parquet::access_plan_optimizer\` module
(re-exported at the crate root):
- \`ParquetAccessPlanOptimizer\` trait
- \`AccessPlanContext\` struct
- \`OptimizerStage\` enum
- \`ParquetSource::with_access_plan_optimizer(Arc<dyn
ParquetAccessPlanOptimizer>) -> Self\`
- \`ParquetSource::access_plan_optimizers() -> &[Arc<dyn
ParquetAccessPlanOptimizer>]\`
No existing API changed. No breaking changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]