alamb commented on code in PR #18391:
URL: https://github.com/apache/datafusion/pull/18391#discussion_r2479383431
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -496,6 +502,116 @@ fn copy_arrow_reader_metrics(
}
}
+/// Eagerly prefetches the next RowGroup from the underlying stream
+struct EagerRowGroupPrefetchStream<T> {
+ /// Outstanding prefetch state
+ state: EagerPrefetchState<T>,
+ /// Active reader, if any
+ parquet_record_batch_reader: Option<ParquetRecordBatchReader>,
+}
+
+struct PrefetchResult<T> {
+ stream: ParquetRecordBatchStream<T>,
+ reader: Option<ParquetRecordBatchReader>,
+}
+
+enum EagerPrefetchState<T> {
+ /// Trying to open the next RowGroup in a new task
+ Prefetching(SpawnedTask<Result<PrefetchResult<T>>>),
+ Done,
+}
+
+impl<T> EagerPrefetchState<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ /// Begin fetching the next row group, if any
+ fn next_row_group(mut stream: ParquetRecordBatchStream<T>) -> Self {
+ let task = SpawnedTask::spawn(async move {
+ let reader = stream.next_row_group().await?;
+ let result = PrefetchResult { stream, reader };
+ Ok(result)
+ });
+ Self::Prefetching(task)
+ }
+}
+
+impl<T> EagerRowGroupPrefetchStream<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ pub fn new(stream: ParquetRecordBatchStream<T>) -> Self {
+ Self {
+ state: EagerPrefetchState::next_row_group(stream),
Review Comment:
in real code, this shouldn't start until the first batch is requested I
suspect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]