adriangb opened a new pull request, #22160:
URL: https://github.com/apache/datafusion/pull/22160
## Which issue does this PR close?
Relates to the discussion in #22024 about how to extend the Parquet opener
with additional pruning strategies (sampling, custom indexes, etc.) without
forking it. Does not close that issue — this is the smallest standalone PR that
delivers an extension point.
## Rationale for this change
DataFusion's Parquet opener runs a fixed pipeline of built-in pruning passes
(file range, row-group statistics, bloom filter, limit, page index). Proposed
work in #22024 (sampling), #21637 (fully-matched row groups), and #22144
(adaptive filter pushdown) all want to *add* passes — and today each one
requires editing the opener inline.
This PR adds two extension points that let those passes (and external ones —
user-defined Parquet indexes, custom statistics) live outside the opener:
- \`PostMetadataAccessPlanHook\` — runs after the built-in file-range and
row-group-statistics passes have refined the plan. Bloom filters have not been
loaded yet.
- \`PreBuildStreamAccessPlanHook\` — runs after all built-in pruning passes,
just before the reader stream is constructed.
A hook may need to do *multiple* steps of CPU and I/O work — for example,
fetch an external index (I/O), then apply pruning using the fetched data (CPU).
To preserve the opener's CPU/I/O routing, each hook is itself a small state
machine driven by the opener.
## What changes are included in this PR?
### New trait shape
\`\`\`rust
pub trait PostMetadataAccessPlanHook: Debug + Send + Sync {
fn begin(&self) -> Box<dyn PostMetadataHookInstance>;
}
pub trait PostMetadataHookInstance: Debug + Send {
fn step(self: Box<Self>, ctx: Box<PostMetadataContext>)
-> Result<PostMetadataHookStep>;
}
pub enum PostMetadataHookStep {
/// Hook needs I/O. Caller awaits the future on the I/O pool;
/// future returns the updated context + next instance.
Yield(PostMetadataHookYieldFuture),
/// Hook is finished.
Done(Box<PostMetadataContext>),
}
\`\`\`
(Equivalent shapes for the \`PreBuildStream\` stage.)
Each \"state\" of a hook is its own type implementing the instance trait.
\`step\` consumes the current instance and returns the next one — so loaded
data (e.g. an external index) becomes a field on the next instance type, with
no scratchpad on the context.
### State machine integration
The opener's \`ParquetOpenState\` gains four new variants — \`Cpu\` and
\`Io\` per stage — that route hook steps to the appropriate pool:
\`\`\`text
... → PruneWithStatistics
→ RunPostMetadataHooksCpu ⇄ RunPostMetadataHooksIo
→ LoadBloomFilters → PruneWithBloomFilters
→ FinalizeAccessPlan (built-in limit + page-index pruning)
→ RunPreBuildStreamHooksCpu ⇄ RunPreBuildStreamHooksIo
→ BuildStream
\`\`\`
\`build_stream\`'s prior inline limit + page-index pruning moves to a
dedicated \`FinalizeAccessPlan\` CPU step so the \`PreBuildStream\` hooks run
*after* built-in pruning. When no hooks are registered for a stage, the opener
skips the new states entirely — zero allocation overhead for the no-hook case.
### \`SharedAsyncFileReader\`
\`AsyncFileReader::get_bytes\` takes \`&mut self\`, so a raw \`Box<dyn
AsyncFileReader>\` can't be shared between consumers. A new
\`SharedAsyncFileReader\` wraps the boxed reader in
\`Arc<tokio::sync::Mutex<...>>\` and reimplements \`AsyncFileReader\`. Cloning
the wrapper bumps the \`Arc\` refcount; hook I/O futures get warm state (footer
cache, byte-coalescing buffers in custom readers) for free instead of paying
for \`factory.create_reader(...)\` per hook. The Mutex never contends in
practice because reads are sequential.
The wrapper replaces \`Box<dyn AsyncFileReader>\` everywhere it was
previously used in \`PreparedParquetOpen\` and \`PushDecoderStreamState\`.
### Registration surface
\`\`\`rust
ParquetSource::new(schema)
.with_post_metadata_access_plan_hook(Arc::new(MyExternalIndexHook { ...
}))
.with_pre_build_stream_access_plan_hook(Arc::new(MySampler { ... }))
\`\`\`
Multiple hooks per stage are supported; they run in registration order.
### Deliberately not in this PR
- **Migrating built-in passes to hooks.** Bloom-filter loading + pruning,
page-index pruning, and limit pruning all stay in the opener for now. These
migrations are mechanical follow-ups that can be reviewed individually for
semantics preservation.
- **Per-conjunct pruning rates / selectivity tracker integration.** That's a
separate API question and not needed for the extension point itself.
## Are these changes tested?
Yes:
- All 100 existing \`datafusion-datasource-parquet\` unit tests pass.
- New \`test_post_metadata_hook_multi_step\` exercises a multi-step user
hook end-to-end:
- CPU step inspects context and yields a future.
- I/O step calls \`ctx.async_file_reader.clone().get_bytes(..)\` (proving
the shared-reader plumbing works against a real parquet file in object store).
- CPU step narrows the access plan.
- Test asserts the reader skips the corresponding row group (50 rows out
of 100 across 2 row groups).
- \`cargo fmt --all\`, \`./dev/rust_lint.sh\`, \`cargo clippy -p
datafusion-datasource-parquet --all-targets --all-features -- -D warnings\` all
pass.
- Downstream \`datafusion\` core builds clean.
## Are there any user-facing changes?
**New public Rust API**, all in \`datafusion_datasource_parquet\`
(re-exported at the crate root):
- \`PostMetadataAccessPlanHook\` + \`PostMetadataHookInstance\` +
\`PostMetadataHookStep\` + \`PostMetadataHookYieldFuture\` +
\`PostMetadataContext\`
- \`PreBuildStreamAccessPlanHook\` + \`PreBuildStreamHookInstance\` +
\`PreBuildStreamHookStep\` + \`PreBuildStreamHookYieldFuture\` +
\`PreBuildStreamContext\`
- \`SharedAsyncFileReader\` (in the \`reader\` module)
- \`ParquetSource::with_post_metadata_access_plan_hook\`,
\`with_pre_build_stream_access_plan_hook\`
- \`ParquetSource::post_metadata_access_plan_hooks\`,
\`pre_build_stream_access_plan_hooks\`
No existing API changed. No breaking changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]