adriangb commented on code in PR #21327:
URL: https://github.com/apache/datafusion/pull/21327#discussion_r3045338745
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -392,93 +426,258 @@ impl ParquetOpenState {
}
}
+/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
+///
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+ planner: Box<dyn MorselPlanner>,
+ pending_io: Option<BoxFuture<'static, Result<()>>>,
+ ready_morsels: VecDeque<Box<dyn Morsel>>,
+}
+
+impl ParquetOpenFuture {
+ fn new(
+ morselizer: &ParquetMorselizer,
+ partitioned_file: PartitionedFile,
+ ) -> Result<Self> {
+ Ok(Self {
+ planner: morselizer.plan_file(partitioned_file)?,
+ pending_io: None,
+ ready_morsels: VecDeque::new(),
+ })
+ }
+}
+
impl Future for ParquetOpenFuture {
type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
loop {
- let state = mem::replace(&mut self.state, ParquetOpenState::Done);
- let mut state = state.transition()?;
+ // If waiting on IO, poll
+ if let Some(io_future) = self.pending_io.as_mut() {
+ ready!(io_future.poll_unpin(cx))?;
+ self.pending_io = None;
+ }
+
+ // have a morsel ready to go, return that
+ if let Some(morsel) = self.ready_morsels.pop_front() {
+ return Poll::Ready(Ok(morsel.into_stream()));
+ }
+
+ // Planner did not produce any stream (for example, it pruned the
entire file)
+ let Some(mut plan) = self.planner.plan()? else {
+ return Poll::Ready(Ok(futures::stream::empty().boxed()));
+ };
+
+ let child_planners = plan.take_planners();
+ if !child_planners.is_empty() {
+ return Poll::Ready(internal_err!(
+ "Parquet FileOpener adapter does not support child morsel
planners"
+ ));
+ }
+
+ self.ready_morsels = plan.take_morsels().into();
+
+ if let Some(io_future) = plan.take_io_future() {
+ self.pending_io = Some(io_future);
+ }
+ }
+ }
+}
+
+/// Implements the Morsel API
+struct ParquetStreamMorsel {
+ stream: BoxStream<'static, Result<RecordBatch>>,
+}
+
+impl ParquetStreamMorsel {
+ fn new(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
+ Self { stream }
+ }
+}
+
+impl fmt::Debug for ParquetStreamMorsel {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetStreamMorsel")
+ .finish_non_exhaustive()
+ }
+}
+
+impl Morsel for ParquetStreamMorsel {
+ fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>
{
+ self.stream
+ }
+}
+
+/// Stateful planner for opening a single parquet file via the morsel APIs.
+enum ParquetMorselPlanner {
+ /// Ready to perform CPU-only planning work.
+ Ready(ParquetOpenState),
+ /// Waiting for an I/O future to produce the next planner state.
+ ///
+ /// Callers must not call [`MorselPlanner::plan`] again until the
+ /// corresponding I/O future has completed and its result is ready to
+ /// receive from the channel.
+ ///
+ /// Doing so is a protocol violation and transitions the planner to
+ /// [`ParquetMorselPlanner::Errored`].
+ Waiting(Receiver<Result<ParquetOpenState>>),
+ /// Actively planning (this state should be replaced by end of the call to
plan)
+ Planning,
+ /// An earlier planning attempt returned an error.
+ Errored,
+}
+
+impl fmt::Debug for ParquetMorselPlanner {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::Ready(state) => f
+ .debug_tuple("ParquetMorselPlanner::Ready")
+ .field(state)
+ .finish(),
+ Self::Waiting(_) => f
+ .debug_tuple("ParquetMorselPlanner::Waiting")
+ .field(&"<pending io>")
+ .finish(),
+ Self::Planning =>
f.debug_tuple("ParquetMorselPlanner::Planning").finish(),
+ Self::Errored =>
f.debug_tuple("ParquetMorselPlanner::Errored").finish(),
+ }
+ }
+}
+
+impl ParquetMorselPlanner {
+ fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) ->
Result<Self> {
+ let prepared = morselizer.prepare_open_file(file)?;
+ #[cfg(feature = "parquet_encryption")]
+ let state = ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ encryption_context: Arc::new(morselizer.get_encryption_context()),
+ };
+ #[cfg(not(feature = "parquet_encryption"))]
+ let state = ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ };
+ Ok(Self::Ready(state))
+ }
+
+ /// Schedule an I/O future that resolves to the planner's next owned state.
+ ///
+ /// This helper
+ ///
+ /// 1. creates a channel to send the next [`ParquetOpenState`] back to the
+ /// planner once the I/O future completes,
+ ///
+ /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`]
+ ///
+ /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the
+ /// caller to poll.
+ ///
+ fn schedule_io<F>(&mut self, future: F) -> MorselPlan
Review Comment:
We can also probably abstract some of this away once we are implementing a
second format. I'd guess Avro is a good candidate? Maybe we can work with the
Vortex folks (cc @AdamGS, it might be good if you start looking at this stuff
since you have experience with similar in Vortex + we want to make sure these
APIs will work for Vortex).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]