adriangb opened a new pull request, #22160:
URL: https://github.com/apache/datafusion/pull/22160

   ## Which issue does this PR close?
   
   Relates to the discussion in #22024 about how to extend the Parquet opener 
with additional pruning strategies (sampling, custom indexes, etc.) without 
forking it. Does not close that issue — this is the smallest standalone PR that 
delivers an extension point.
   
   ## Rationale for this change
   
   DataFusion's Parquet opener runs a fixed pipeline of built-in pruning passes 
(file range, row-group statistics, bloom filter, limit, page index). Proposed 
work in #22024 (sampling), #21637 (fully-matched row groups), and #22144 
(adaptive filter pushdown) all want to *add* passes — and today each one 
requires editing the opener inline.
   
   This PR adds two extension points that let those passes (and external ones — 
user-defined Parquet indexes, custom statistics) live outside the opener:
   
   - \`PostMetadataAccessPlanHook\` — runs after the built-in file-range and 
row-group-statistics passes have refined the plan. Bloom filters have not been 
loaded yet.
   - \`PreBuildStreamAccessPlanHook\` — runs after all built-in pruning passes, 
just before the reader stream is constructed.
   
   A hook may need to do *multiple* steps of CPU and I/O work — for example, 
fetch an external index (I/O), then apply pruning using the fetched data (CPU). 
To preserve the opener's CPU/I/O routing, each hook is itself a small state 
machine driven by the opener.
   
   ## What changes are included in this PR?
   
   ### New trait shape
   
   \`\`\`rust
   pub trait PostMetadataAccessPlanHook: Debug + Send + Sync {
       fn begin(&self) -> Box<dyn PostMetadataHookInstance>;
   }
   
   pub trait PostMetadataHookInstance: Debug + Send {
       fn step(self: Box<Self>, ctx: Box<PostMetadataContext>)
           -> Result<PostMetadataHookStep>;
   }
   
   pub enum PostMetadataHookStep {
       /// Hook needs I/O. Caller awaits the future on the I/O pool;
       /// future returns the updated context + next instance.
       Yield(PostMetadataHookYieldFuture),
       /// Hook is finished.
       Done(Box<PostMetadataContext>),
   }
   \`\`\`
   
   (Equivalent shapes for the \`PreBuildStream\` stage.)
   
   Each \"state\" of a hook is its own type implementing the instance trait. 
\`step\` consumes the current instance and returns the next one — so loaded 
data (e.g. an external index) becomes a field on the next instance type, with 
no scratchpad on the context.
   
   ### State machine integration
   
   The opener's \`ParquetOpenState\` gains four new variants — \`Cpu\` and 
\`Io\` per stage — that route hook steps to the appropriate pool:
   
   \`\`\`text
   ... → PruneWithStatistics
       → RunPostMetadataHooksCpu  ⇄  RunPostMetadataHooksIo
       → LoadBloomFilters → PruneWithBloomFilters
       → FinalizeAccessPlan  (built-in limit + page-index pruning)
       → RunPreBuildStreamHooksCpu  ⇄  RunPreBuildStreamHooksIo
       → BuildStream
   \`\`\`
   
   \`build_stream\`'s prior inline limit + page-index pruning moves to a 
dedicated \`FinalizeAccessPlan\` CPU step so the \`PreBuildStream\` hooks run 
*after* built-in pruning. When no hooks are registered for a stage, the opener 
skips the new states entirely — zero allocation overhead for the no-hook case.
   
   ### \`SharedAsyncFileReader\`
   
   \`AsyncFileReader::get_bytes\` takes \`&mut self\`, so a raw \`Box<dyn 
AsyncFileReader>\` can't be shared between consumers. A new 
\`SharedAsyncFileReader\` wraps the boxed reader in 
\`Arc<tokio::sync::Mutex<...>>\` and reimplements \`AsyncFileReader\`. Cloning 
the wrapper bumps the \`Arc\` refcount; hook I/O futures get warm state (footer 
cache, byte-coalescing buffers in custom readers) for free instead of paying 
for \`factory.create_reader(...)\` per hook. The Mutex never contends in 
practice because reads are sequential.
   
   The wrapper replaces \`Box<dyn AsyncFileReader>\` everywhere it was 
previously used in \`PreparedParquetOpen\` and \`PushDecoderStreamState\`.
   
   ### Registration surface
   
   \`\`\`rust
   ParquetSource::new(schema)
       .with_post_metadata_access_plan_hook(Arc::new(MyExternalIndexHook { ... 
}))
       .with_pre_build_stream_access_plan_hook(Arc::new(MySampler { ... }))
   \`\`\`
   
   Multiple hooks per stage are supported; they run in registration order.
   
   ### Deliberately not in this PR
   
   - **Migrating built-in passes to hooks.** Bloom-filter loading + pruning, 
page-index pruning, and limit pruning all stay in the opener for now. These 
migrations are mechanical follow-ups that can be reviewed individually for 
semantics preservation.
   - **Per-conjunct pruning rates / selectivity tracker integration.** That's a 
separate API question and not needed for the extension point itself.
   
   ## Are these changes tested?
   
   Yes:
   - All 100 existing \`datafusion-datasource-parquet\` unit tests pass.
   - New \`test_post_metadata_hook_multi_step\` exercises a multi-step user 
hook end-to-end:
     - CPU step inspects context and yields a future.
     - I/O step calls \`ctx.async_file_reader.clone().get_bytes(..)\` (proving 
the shared-reader plumbing works against a real parquet file in object store).
     - CPU step narrows the access plan.
     - Test asserts the reader skips the corresponding row group (50 rows out 
of 100 across 2 row groups).
   - \`cargo fmt --all\`, \`./dev/rust_lint.sh\`, \`cargo clippy -p 
datafusion-datasource-parquet --all-targets --all-features -- -D warnings\` all 
pass.
   - Downstream \`datafusion\` core builds clean.
   
   ## Are there any user-facing changes?
   
   **New public Rust API**, all in \`datafusion_datasource_parquet\` 
(re-exported at the crate root):
   
   - \`PostMetadataAccessPlanHook\` + \`PostMetadataHookInstance\` + 
\`PostMetadataHookStep\` + \`PostMetadataHookYieldFuture\` + 
\`PostMetadataContext\`
   - \`PreBuildStreamAccessPlanHook\` + \`PreBuildStreamHookInstance\` + 
\`PreBuildStreamHookStep\` + \`PreBuildStreamHookYieldFuture\` + 
\`PreBuildStreamContext\`
   - \`SharedAsyncFileReader\` (in the \`reader\` module)
   - \`ParquetSource::with_post_metadata_access_plan_hook\`, 
\`with_pre_build_stream_access_plan_hook\`
   - \`ParquetSource::post_metadata_access_plan_hooks\`, 
\`pre_build_stream_access_plan_hooks\`
   
   No existing API changed. No breaking changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to