adriangb commented on code in PR #21327:
URL: https://github.com/apache/datafusion/pull/21327#discussion_r3042547795
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
}
}
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+ planner: Box<dyn MorselPlanner>,
+ pending_io: Option<BoxFuture<'static, Result<()>>>,
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+ fn new(
+ morselizer: &ParquetMorselizer,
+ partitioned_file: PartitionedFile,
+ ) -> Result<Self> {
+ Ok(Self {
+ planner: morselizer.plan_file(partitioned_file)?,
+ pending_io: None,
+ ready_morsels: VecDeque::new(),
+ })
+ }
+}
+
impl Future for ParquetOpenFuture {
type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
loop {
- let state = mem::replace(&mut self.state, ParquetOpenState::Done);
- let mut state = state.transition()?;
+ // If waiting on IO, poll
+ if let Some(io_future) = self.pending_io.as_mut() {
+ ready!(io_future.poll_unpin(cx))?;
+ self.pending_io = None;
+ }
+
+ // have a morsel ready to go, return that
+ if let Some(morsel) = self.ready_morsels.pop_front() {
+ return Poll::Ready(Ok(morsel.into_stream()));
+ }
+
+ // Planner did not produce any stream (for example, it pruned the
entire file)
+ let Some(mut plan) = self.planner.plan()? else {
+ return Poll::Ready(Ok(futures::stream::empty().boxed()));
+ };
+
+ let child_planners = plan.take_planners();
+ if !child_planners.is_empty() {
+ return Poll::Ready(internal_err!(
+ "Parquet FileOpener adapter does not support child morsel
planners"
+ ));
+ }
+
+ self.ready_morsels = plan.take_morsels().into();
+
+ if let Some(io_future) = plan.take_io_future() {
+ self.pending_io = Some(io_future);
+ }
+ }
+ }
+}
+
+/// Implements the Morsel API
+struct ParquetStreamMorsel {
+ stream: BoxStream<'static, Result<RecordBatch>>,
+}
+
+impl ParquetStreamMorsel {
+ fn new(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
+ Self { stream }
+ }
+}
+
+impl fmt::Debug for ParquetStreamMorsel {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetStreamMorsel")
+ .finish_non_exhaustive()
+ }
+}
+
+impl Morsel for ParquetStreamMorsel {
+ fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>
{
+ self.stream
+ }
+}
+
+/// Stateful planner for opening a single parquet file via the morsel APIs.
+enum ParquetMorselPlanner {
+ /// Ready to perform CPU-only planning work.
+ Ready(ParquetOpenState),
+ /// Waiting for an I/O future to produce the next planner state.
+ ///
+ /// Callers must not call [`MorselPlanner::plan`] again until the
+ /// corresponding I/O future has completed and its result is ready to
+ /// receive from the channel.
+ ///
+ /// Doing so is a protocol violation and transitions the planner to
+ /// [`ParquetMorselPlanner::Errored`].
Review Comment:
This feels a bit brittle / easy to get wrong. Even if there is a clear error
state I can still see a bug going into production because this contract is not
encoded in the type system.
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
}
}
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+ planner: Box<dyn MorselPlanner>,
+ pending_io: Option<BoxFuture<'static, Result<()>>>,
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+ fn new(
+ morselizer: &ParquetMorselizer,
+ partitioned_file: PartitionedFile,
+ ) -> Result<Self> {
+ Ok(Self {
+ planner: morselizer.plan_file(partitioned_file)?,
+ pending_io: None,
+ ready_morsels: VecDeque::new(),
+ })
+ }
+}
+
impl Future for ParquetOpenFuture {
type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
loop {
- let state = mem::replace(&mut self.state, ParquetOpenState::Done);
- let mut state = state.transition()?;
+ // If waiting on IO, poll
+ if let Some(io_future) = self.pending_io.as_mut() {
+ ready!(io_future.poll_unpin(cx))?;
+ self.pending_io = None;
+ }
+
+ // have a morsel ready to go, return that
+ if let Some(morsel) = self.ready_morsels.pop_front() {
+ return Poll::Ready(Ok(morsel.into_stream()));
+ }
+
+ // Planner did not produce any stream (for example, it pruned the
entire file)
+ let Some(mut plan) = self.planner.plan()? else {
+ return Poll::Ready(Ok(futures::stream::empty().boxed()));
+ };
+
+ let child_planners = plan.take_planners();
+ if !child_planners.is_empty() {
+ return Poll::Ready(internal_err!(
+ "Parquet FileOpener adapter does not support child morsel
planners"
+ ));
+ }
+
+ self.ready_morsels = plan.take_morsels().into();
+
+ if let Some(io_future) = plan.take_io_future() {
+ self.pending_io = Some(io_future);
+ }
+ }
+ }
+}
+
+/// Implements the Morsel API
+struct ParquetStreamMorsel {
+ stream: BoxStream<'static, Result<RecordBatch>>,
+}
+
+impl ParquetStreamMorsel {
+ fn new(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
+ Self { stream }
+ }
+}
+
+impl fmt::Debug for ParquetStreamMorsel {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetStreamMorsel")
+ .finish_non_exhaustive()
+ }
+}
+
+impl Morsel for ParquetStreamMorsel {
+ fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>
{
+ self.stream
+ }
+}
+
+/// Stateful planner for opening a single parquet file via the morsel APIs.
+enum ParquetMorselPlanner {
+ /// Ready to perform CPU-only planning work.
+ Ready(ParquetOpenState),
+ /// Waiting for an I/O future to produce the next planner state.
+ ///
+ /// Callers must not call [`MorselPlanner::plan`] again until the
+ /// corresponding I/O future has completed and its result is ready to
+ /// receive from the channel.
+ ///
+ /// Doing so is a protocol violation and transitions the planner to
+ /// [`ParquetMorselPlanner::Errored`].
+ Waiting(Receiver<Result<ParquetOpenState>>),
+ /// Actively planning (this state should be replaced by end of the call to
plan)
+ Planning,
+ /// An earlier planning attempt returned an error.
+ Errored,
+}
+
+impl fmt::Debug for ParquetMorselPlanner {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::Ready(state) => f
+ .debug_tuple("ParquetMorselPlanner::Ready")
+ .field(state)
+ .finish(),
+ Self::Waiting(_) => f
+ .debug_tuple("ParquetMorselPlanner::Waiting")
+ .field(&"<pending io>")
+ .finish(),
+ Self::Planning =>
f.debug_tuple("ParquetMorselPlanner::Planning").finish(),
+ Self::Errored =>
f.debug_tuple("ParquetMorselPlanner::Errored").finish(),
+ }
+ }
+}
+
+impl ParquetMorselPlanner {
+ fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) ->
Result<Self> {
+ let prepared = morselizer.prepare_open_file(file)?;
+ #[cfg(feature = "parquet_encryption")]
+ let state = ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ encryption_context: Arc::new(morselizer.get_encryption_context()),
+ };
+ #[cfg(not(feature = "parquet_encryption"))]
+ let state = ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ };
+ Ok(Self::Ready(state))
+ }
+
+ /// Schedule an I/O future that resolves to the planner's next owned state.
+ ///
+ /// This helper
+ ///
+ /// 1. creates a channel to send the next [`ParquetOpenState`] back to the
+ /// planner once the I/O future completes,
+ ///
+ /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`]
+ ///
+ /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the
+ /// caller to poll.
+ ///
+ fn schedule_io<F>(&mut self, future: F) -> MorselPlan
Review Comment:
This seems like a lot to ask each data source to implement.
##########
datafusion/datasource/src/morsel/mod.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Structures for Morsel Driven IO.
+//!
+//! Morsel Driven IO is a technique for parallelizing the reading of large
files
+//! by dividing them into smaller "morsels" that are processed independently.
+//!
+//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
+//! Evaluation Framework for the Many-Core
Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
+
+use std::fmt::Debug;
+
+use crate::PartitionedFile;
+use arrow::array::RecordBatch;
+use datafusion_common::Result;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
+///
+/// This represents a single morsel of work that is ready to be processed. It
+/// has all data necessary (does not need any I/O) and is ready to be turned
+/// into a stream of [`RecordBatch`]es for processing by the execution engine.
+pub trait Morsel: Send + Debug {
+ /// Consume this morsel and produce a stream of [`RecordBatch`]es for
processing.
+ ///
+ /// This should not do any I/O work, such as reading from the file.
+ fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
+}
+
+/// A Morselizer takes a single [`PartitionedFile`] and creates the initial
planner
+/// for that file.
+///
+/// This is the entry point for morsel driven I/O.
+pub trait Morselizer: Send + Sync + Debug {
+ /// Return the initial [`MorselPlanner`] for this file.
+ ///
+ /// "Morselzing" a file may involve CPU work, such as parsing parquet
+ /// metadata and evaluating pruning predicates. It should NOT do any I/O
+ /// work, such as reading from the file. If I/O is required, it should
+ /// return a future that the caller can poll to drive the I/O work to
+ /// completion, and once the future is complete, the caller can call
+ /// `plan_file` again for a different file.
+ fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn
MorselPlanner>>;
+}
+
+/// A Morsel Planner is responsible for creating morsels for a given scan.
+///
+/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O
+/// outstanding for a specific planner. DataFusion may run
+/// multiple planners in parallel, which corresponds to multiple parallel
+/// I/O requests.
+///
+/// It is not a Rust `Stream` so that it can explicitly separate CPU bound
+/// work from I/O work.
+///
+/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it
+/// should do CPU work to produce the next morsels or discover the next I/O
+/// phase.
+///
+/// Best practice is to spawn I/O in a Tokio task on a separate runtime to
+/// ensure that CPU work doesn't block or slow down I/O work, but this is not
+/// strictly required by the API.
+pub trait MorselPlanner: Send + Debug {
+ /// Attempt to plan morsels. This may involve CPU work, such as parsing
+ /// parquet metadata and evaluating pruning predicates.
+ ///
+ /// It should NOT do any I/O work, such as reading from the file. If I/O is
+ /// required, the returned [`MorselPlan`] should contain a future that the
+ /// caller polls to drive the I/O work to completion. Once the future is
+ /// complete, the caller can call `plan` again to get the next morsels.
+ ///
+ /// Note this function is **not async** to make it explicitly clear that if
+ /// I/O is required, it should be done in the returned `io_future`.
+ ///
+ /// Returns `None` if the planner has no more work to do.
+ ///
+ /// # Empty Morsel Plans
+ ///
+ /// It may return `None`, which means no batches will be read from the file
+ /// (e.g. due to late-pruning based on statistics).
+ ///
+ /// # Output Ordering
+ ///
+ /// See the comments on [`MorselPlan`] for the logical output order.
+ fn plan(&mut self) -> Result<Option<MorselPlan>>;
+}
+
+/// Return result of [`MorselPlanner::plan`].
+///
+/// # Logical Ordering
+///
+/// For plans where the output order of rows is maintained, the output order of
+/// a [`MorselPlanner`] is logically defined as follows:
+/// 1. All morsels that are directly produced
+/// 2. Recursively, all morsels produced by the returned `planners`
+#[derive(Default)]
+pub struct MorselPlan {
+ /// Any morsels that are ready for processing.
+ morsels: Vec<Box<dyn Morsel>>,
+ /// Any newly-created planners that are ready for CPU work.
+ planners: Vec<Box<dyn MorselPlanner>>,
+ /// A future that will drive any I/O work to completion.
+ ///
+ /// DataFusion will poll this future occasionally to drive the I/O work to
+ /// completion. Once the future resolves, DataFusion will call `plan` again
+ /// to get the next morsels.
+ io_future: Option<BoxFuture<'static, Result<()>>>,
Review Comment:
I don't love having a direct reference to an anonymous future like this. I
wonder if there's a refactor that gives this future a "name"?
##########
datafusion/datasource/src/morsel/mod.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Structures for Morsel Driven IO.
+//!
+//! Morsel Driven IO is a technique for parallelizing the reading of large
files
+//! by dividing them into smaller "morsels" that are processed independently.
+//!
+//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
+//! Evaluation Framework for the Many-Core
Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
+
+use std::fmt::Debug;
+
+use crate::PartitionedFile;
+use arrow::array::RecordBatch;
+use datafusion_common::Result;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
+///
+/// This represents a single morsel of work that is ready to be processed. It
+/// has all data necessary (does not need any I/O) and is ready to be turned
+/// into a stream of [`RecordBatch`]es for processing by the execution engine.
+pub trait Morsel: Send + Debug {
+ /// Consume this morsel and produce a stream of [`RecordBatch`]es for
processing.
+ ///
+ /// This should not do any I/O work, such as reading from the file.
Review Comment:
Presumably this *can* do CPU work (such as pruning using row groups stats +
updated dynamic filters)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]