zhuqi-lucas commented on code in PR #21327:
URL: https://github.com/apache/datafusion/pull/21327#discussion_r3045052671


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
     }
 }
 
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+    planner: Box<dyn MorselPlanner>,
+    pending_io: Option<BoxFuture<'static, Result<()>>>,
+    ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+    fn new(
+        morselizer: &ParquetMorselizer,
+        partitioned_file: PartitionedFile,
+    ) -> Result<Self> {
+        Ok(Self {
+            planner: morselizer.plan_file(partitioned_file)?,
+            pending_io: None,
+            ready_morsels: VecDeque::new(),
+        })
+    }
+}
+
 impl Future for ParquetOpenFuture {
     type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
         loop {
-            let state = mem::replace(&mut self.state, ParquetOpenState::Done);
-            let mut state = state.transition()?;
+            // If waiting on IO, poll
+            if let Some(io_future) = self.pending_io.as_mut() {
+                ready!(io_future.poll_unpin(cx))?;
+                self.pending_io = None;
+            }
+
+            // have a morsel ready to go, return that
+            if let Some(morsel) = self.ready_morsels.pop_front() {
+                return Poll::Ready(Ok(morsel.into_stream()));
+            }
+
+            // Planner did not produce any stream (for example, it pruned the 
entire file)
+            let Some(mut plan) = self.planner.plan()? else {
+                return Poll::Ready(Ok(futures::stream::empty().boxed()));
+            };
+
+            let child_planners = plan.take_planners();
+            if !child_planners.is_empty() {
+                return Poll::Ready(internal_err!(
+                    "Parquet FileOpener adapter does not support child morsel 
planners"
+                ));
+            }
+
+            self.ready_morsels = plan.take_morsels().into();
+
+            if let Some(io_future) = plan.take_io_future() {
+                self.pending_io = Some(io_future);
+            }
+        }
+    }
+}
+
+/// Implements the Morsel API
+struct ParquetStreamMorsel {
+    stream: BoxStream<'static, Result<RecordBatch>>,
+}
+
+impl ParquetStreamMorsel {
+    fn new(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
+        Self { stream }
+    }
+}
+
+impl fmt::Debug for ParquetStreamMorsel {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ParquetStreamMorsel")
+            .finish_non_exhaustive()
+    }
+}
+
+impl Morsel for ParquetStreamMorsel {
+    fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>> 
{
+        self.stream
+    }
+}
+
+/// Stateful planner for opening a single parquet file via the morsel APIs.
+enum ParquetMorselPlanner {
+    /// Ready to perform CPU-only planning work.
+    Ready(ParquetOpenState),
+    /// Waiting for an I/O future to produce the next planner state.
+    ///
+    /// Callers must not call [`MorselPlanner::plan`] again until the
+    /// corresponding I/O future has completed and its result is ready to
+    /// receive from the channel.
+    ///
+    /// Doing so is a protocol violation and transitions the planner to
+    /// [`ParquetMorselPlanner::Errored`].
+    Waiting(Receiver<Result<ParquetOpenState>>),
+    /// Actively planning (this state should be replaced by end of the call to 
plan)
+    Planning,
+    /// An earlier planning attempt returned an error.
+    Errored,
+}
+
+impl fmt::Debug for ParquetMorselPlanner {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Ready(state) => f
+                .debug_tuple("ParquetMorselPlanner::Ready")
+                .field(state)
+                .finish(),
+            Self::Waiting(_) => f
+                .debug_tuple("ParquetMorselPlanner::Waiting")
+                .field(&"<pending io>")
+                .finish(),
+            Self::Planning => 
f.debug_tuple("ParquetMorselPlanner::Planning").finish(),
+            Self::Errored => 
f.debug_tuple("ParquetMorselPlanner::Errored").finish(),
+        }
+    }
+}
+
+impl ParquetMorselPlanner {
+    fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> 
Result<Self> {
+        let prepared = morselizer.prepare_open_file(file)?;
+        #[cfg(feature = "parquet_encryption")]
+        let state = ParquetOpenState::Start {
+            prepared: Box::new(prepared),
+            encryption_context: Arc::new(morselizer.get_encryption_context()),
+        };
+        #[cfg(not(feature = "parquet_encryption"))]
+        let state = ParquetOpenState::Start {
+            prepared: Box::new(prepared),
+        };
+        Ok(Self::Ready(state))
+    }
+
+    /// Schedule an I/O future that resolves to the planner's next owned state.
+    ///
+    /// This helper
+    ///
+    /// 1. creates a channel to send the next [`ParquetOpenState`] back to the
+    ///    planner once the I/O future completes,
+    ///
+    /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`]
+    ///
+    /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the
+    ///    caller to poll.
+    ///
+    fn schedule_io<F>(&mut self, future: F) -> MorselPlan
+    where
+        F: Future<Output = Result<ParquetOpenState>> + Send + 'static,
+    {
+        let (output_for_future, output) = mpsc::channel();

Review Comment:
   Nice work on the Morsel API design! The separation of CPU/IO is clean and 
the adapter layer makes the migration smooth.
   
   
   One question: schedule_io uses std::sync::mpsc::channel to send back the IO 
result, but since this is a single-producer single-consumer send-once pattern, 
would tokio::sync::oneshot be a better fit here?
   
   ```rust
     // Current: std::sync::mpsc (multi-producer queue, sync, heavier)
     let (tx, rx) = std::sync::mpsc::channel();
   
     // Suggestion: tokio::sync::oneshot (single-use, async-native, lighter)
     let (tx, rx) = tokio::sync::oneshot::channel();
   ```
   oneshot seems more precise semantically (IO result sent exactly once), and 
avoids sync primitives in async context. Or is there a reason mpsc was chosen 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to