alamb commented on code in PR #18391:
URL: https://github.com/apache/datafusion/pull/18391#discussion_r2481926154
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -496,6 +502,116 @@ fn copy_arrow_reader_metrics(
}
}
+/// Eagerly prefetches the next RowGroup from the underlying stream
+struct EagerRowGroupPrefetchStream<T> {
+ /// Outstanding prefetch state
+ state: EagerPrefetchState<T>,
+ /// Active reader, if any
+ parquet_record_batch_reader: Option<ParquetRecordBatchReader>,
+}
+
+struct PrefetchResult<T> {
+ stream: ParquetRecordBatchStream<T>,
+ reader: Option<ParquetRecordBatchReader>,
+}
+
+enum EagerPrefetchState<T> {
+ /// Trying to open the next RowGroup in a new task
+ Prefetching(SpawnedTask<Result<PrefetchResult<T>>>),
+ Done,
+}
+
+impl<T> EagerPrefetchState<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ /// Begin fetching the next row group, if any
+ fn next_row_group(mut stream: ParquetRecordBatchStream<T>) -> Self {
+ let task = SpawnedTask::spawn(async move {
+ let reader = stream.next_row_group().await?;
+ let result = PrefetchResult { stream, reader };
+ Ok(result)
+ });
+ Self::Prefetching(task)
+ }
+}
+
+impl<T> EagerRowGroupPrefetchStream<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ pub fn new(stream: ParquetRecordBatchStream<T>) -> Self {
+ Self {
+ state: EagerPrefetchState::next_row_group(stream),
+ parquet_record_batch_reader: None,
+ }
+ }
+}
+
+impl<T> Stream for EagerRowGroupPrefetchStream<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ loop {
+ // If we have an active reader, try to read from it first
+ if let Some(mut reader) = self.parquet_record_batch_reader.take() {
+ match reader.next() {
+ Some(result) => {
+ // Return the batch
+ self.parquet_record_batch_reader = Some(reader);
+ let result = result.map_err(DataFusionError::from);
+ return Poll::Ready(Some(result));
+ }
+ None => {
+ // Reader is exhausted, continue to prefetching the
next row group
+ }
+ }
+ }
+
+ use futures::Future;
+
+ match &mut self.state {
+ EagerPrefetchState::Prefetching(handle) => {
+ // check if the inner is ready
+ let handle = pin!(handle);
+ match ready!(handle.poll(cx)) {
+ Ok(Ok(result)) => {
+ let PrefetchResult { stream, reader } = result;
+ // no reader means end of stream
+ if reader.is_none() {
+ self.state = EagerPrefetchState::Done;
+ } else {
+ // immediately start reading the next row group
+ self.state =
EagerPrefetchState::next_row_group(stream);
Review Comment:
good call -- I will look into that. I am not sure buffering more than 1 row
group will really help (if you have more than one reader buffered it means the
rest of the plan can't consume the data fast enough (and we need to apply
backpressure)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]