alamb commented on code in PR #21327:
URL: https://github.com/apache/datafusion/pull/21327#discussion_r3039515869


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -77,12 +83,26 @@ use parquet::bloom_filter::Sbbf;
 use parquet::errors::ParquetError;
 use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
 
-/// Entry point for opening a Parquet file
+/// Implements [`FileOpener`] for Parquet

Review Comment:
   This is a temporary shim until I rewrite FIleStream in terms of the 
MorselizerAPI
   - https://github.com/apache/datafusion/pull/21342



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
     }
 }
 
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+    planner: Box<dyn MorselPlanner>,
+    pending_io: Option<BoxFuture<'static, Result<()>>>,
+    ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+    fn new(
+        morselizer: &ParquetMorselizer,
+        partitioned_file: PartitionedFile,
+    ) -> Result<Self> {
+        Ok(Self {
+            planner: morselizer.plan_file(partitioned_file)?,
+            pending_io: None,
+            ready_morsels: VecDeque::new(),
+        })
+    }
+}
+
 impl Future for ParquetOpenFuture {
     type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
         loop {
-            let state = mem::replace(&mut self.state, ParquetOpenState::Done);
-            let mut state = state.transition()?;
+            // If waiting on IO, poll

Review Comment:
   While the IO and CPU still happen in the same future (in this PR) they are 
now more explicitly broken out and the code is ready to make the split more 
deliberate



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
     }
 }
 
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+    planner: Box<dyn MorselPlanner>,
+    pending_io: Option<BoxFuture<'static, Result<()>>>,
+    ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+    fn new(
+        morselizer: &ParquetMorselizer,
+        partitioned_file: PartitionedFile,
+    ) -> Result<Self> {
+        Ok(Self {
+            planner: morselizer.plan_file(partitioned_file)?,
+            pending_io: None,
+            ready_morsels: VecDeque::new(),
+        })
+    }
+}
+
 impl Future for ParquetOpenFuture {
     type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
         loop {
-            let state = mem::replace(&mut self.state, ParquetOpenState::Done);
-            let mut state = state.transition()?;
+            // If waiting on IO, poll
+            if let Some(io_future) = self.pending_io.as_mut() {
+                ready!(io_future.poll_unpin(cx))?;
+                self.pending_io = None;
+            }
+
+            // have a morsel ready to go, return that
+            if let Some(morsel) = self.ready_morsels.pop_front() {
+                return Poll::Ready(Ok(morsel.into_stream()));
+            }
+
+            // Planner did not produce any stream (for example, it pruned the 
entire file)
+            let Some(mut plan) = self.planner.plan()? else {
+                return Poll::Ready(Ok(futures::stream::empty().boxed()));
+            };
+
+            let child_planners = plan.take_planners();
+            if !child_planners.is_empty() {
+                return Poll::Ready(internal_err!(
+                    "Parquet FileOpener adapter does not support child morsel 
planners"
+                ));
+            }
+
+            self.ready_morsels = plan.take_morsels().into();
+
+            if let Some(io_future) = plan.take_io_future() {
+                self.pending_io = Some(io_future);
+            }
+        }
+    }
+}
+
+/// Implements the Morsel API
+struct ParquetStreamMorsel {
+    stream: BoxStream<'static, Result<RecordBatch>>,
+}
+
+impl ParquetStreamMorsel {
+    fn new(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
+        Self { stream }
+    }
+}
+
+impl fmt::Debug for ParquetStreamMorsel {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ParquetStreamMorsel")
+            .finish_non_exhaustive()
+    }
+}
+
+impl Morsel for ParquetStreamMorsel {
+    fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>> 
{
+        self.stream
+    }
+}
+
+/// Stateful planner for opening a single parquet file via the morsel APIs.
+enum ParquetMorselPlanner {
+    /// Ready to perform CPU-only planning work.
+    Ready(ParquetOpenState),

Review Comment:
   While the split is not used yet, the Parquet opener is now more explicit 
about when it is IO or CPU ready state. 



##########
datafusion/datasource/src/morsel/mod.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   This is the key new API -- the idea is that over time we can break up 
processing of files into smaller pieces ("morsels") more dynamically at runtime



##########
datafusion/datasource/src/morsel/mod.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Structures for Morsel Driven IO.
+//!
+//! Morsel Driven IO is a technique for parallelizing the reading of large 
files
+//! by dividing them into smaller "morsels" that are processed independently.
+//!
+//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
+//! Evaluation Framework for the Many-Core 
Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
+
+use std::fmt::Debug;
+
+use crate::PartitionedFile;
+use arrow::array::RecordBatch;
+use datafusion_common::Result;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
+///
+/// This represents a single morsel of work that is ready to be processed. It
+/// has all data necessary (does not need any I/O) and is ready to be turned
+/// into a stream of [`RecordBatch`]es for processing by the execution engine.
+pub trait Morsel: Send + Debug {
+    /// Consume this morsel and produce a stream of [`RecordBatch`]es for 
processing.
+    ///
+    /// This should not do any I/O work, such as reading from the file.
+    fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
+}
+
+/// A Morselizer takes a single [`PartitionedFile`] and creates the initial 
planner
+/// for that file.
+///
+/// This is the entry point for morsel driven I/O.
+pub trait Morselizer: Send + Sync + Debug {
+    /// Return the initial [`MorselPlanner`] for this file.
+    ///
+    /// "Morselzing" a file may involve CPU work, such as parsing parquet
+    /// metadata and evaluating pruning predicates. It should NOT do any I/O
+    /// work, such as reading from the file. If I/O is required, it should
+    /// return a future that the caller can poll to drive the I/O work to
+    /// completion, and once the future is complete, the caller can call
+    /// `plan_file` again for a different file.
+    fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn 
MorselPlanner>>;
+}
+
+/// A Morsel Planner is responsible for creating morsels for a given scan.
+///
+/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O
+/// outstanding for a specific planner. DataFusion may run
+/// multiple planners in parallel, which corresponds to multiple parallel
+/// I/O requests.
+///
+/// It is not a Rust `Stream` so that it can explicitly separate CPU bound
+/// work from I/O work.
+///
+/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it
+/// should do CPU work to produce the next morsels or discover the next I/O
+/// phase.
+///
+/// Best practice is to spawn I/O in a Tokio task on a separate runtime to
+/// ensure that CPU work doesn't block or slow down I/O work, but this is not
+/// strictly required by the API.
+pub trait MorselPlanner: Send + Debug {
+    /// Attempt to plan morsels. This may involve CPU work, such as parsing
+    /// parquet metadata and evaluating pruning predicates.
+    ///
+    /// It should NOT do any I/O work, such as reading from the file. If I/O is
+    /// required, the returned [`MorselPlan`] should contain a future that the
+    /// caller polls to drive the I/O work to completion. Once the future is
+    /// complete, the caller can call `plan` again to get the next morsels.
+    ///
+    /// Note this function is **not async** to make it explicitly clear that if
+    /// I/O is required, it should be done in the returned `io_future`.
+    ///
+    /// Returns `None` if the planner has no more work to do.
+    ///
+    /// # Empty Morsel Plans
+    ///
+    /// It may return `None`, which means no batches will be read from the file
+    /// (e.g. due to late-pruning based on statistics).
+    ///
+    /// # Output Ordering
+    ///
+    /// See the comments on [`MorselPlan`] for the logical output order.
+    fn plan(&mut self) -> Result<Option<MorselPlan>>;
+}
+
+/// Return result of [`MorselPlanner::plan`].
+///
+/// # Logical Ordering
+///
+/// For plans where the output order of rows is maintained, the output order of
+/// a [`MorselPlanner`] is logically defined as follows:
+/// 1. All morsels that are directly produced
+/// 2. Recursively, all morsels produced by the returned `planners`
+#[derive(Default)]
+pub struct MorselPlan {
+    /// Any morsels that are ready for processing.
+    morsels: Vec<Box<dyn Morsel>>,
+    /// Any newly-created planners that are ready for CPU work.
+    planners: Vec<Box<dyn MorselPlanner>>,

Review Comment:
   The key design elements are:
   1. The IO future is explicit
   2. There are potentially other MorselPlanners
   
   The idea here is that in the future we may want to split the planning of 
files into smaller morsels (e.g. a MorselPlanner for each RowGroup) which might 
be planned in parallel as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to