thinkharderdev commented on code in PR #6676:
URL: https://github.com/apache/arrow-rs/pull/6676#discussion_r1894947151


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -660,36 +705,89 @@ where
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         loop {
             match &mut self.state {
-                StreamState::Decoding(batch_reader) => match 
batch_reader.next() {
-                    Some(Ok(batch)) => {
-                        return Poll::Ready(Some(Ok(batch)));
+                StreamState::Decoding(batch_reader) => {
+                    let res: Self::Item = match batch_reader.next() {
+                        Some(Ok(batch)) => Ok(batch),
+                        Some(Err(e)) => {
+                            self.state = StreamState::Error;
+                            return 
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+                        }
+                        None => {
+                            self.state = StreamState::Init;
+                            continue;
+                        }
+                    };
+
+                    if !self.prefetch_row_groups
+                        || self.row_groups.is_empty()
+                        || self.next_reader.is_some()
+                    {
+                        return Poll::Ready(Some(res));
                     }
-                    Some(Err(e)) => {
-                        self.state = StreamState::Error;
-                        return 
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+
+                    let old_state = std::mem::replace(&mut self.state, 
StreamState::Init);
+
+                    let row_group_idx = self.row_groups.pop_front().unwrap(); 
// already checked that row_groups is not empty
+
+                    let fut = self.read_row_group(row_group_idx);
+
+                    if let StreamState::Decoding(batch_reader) = old_state {
+                        self.state = StreamState::Prefetch(batch_reader, fut);
+                        return Poll::Ready(Some(res));
+                    } else {
+                        unreachable!()
                     }
-                    None => self.state = StreamState::Init,
-                },
+                }
+                StreamState::Prefetch(batch_reader, f) => {
+                    let mut noop_cx = 
Context::from_waker(futures::task::noop_waker_ref());

Review Comment:
   The concern I have here is that this effectively decouples polling the 
underlying IO operations from the reactor. We end up polling the future more or 
less continuously (assuming the consumer is just polling the stream in a loop 
during decoding which is likely the case). 
   
   A different way to handle this would be to have a `Prefetcher` which can 
spawn the prefetch op in the background and let the runtime poll it. You can 
just "assume tokio" in which case the prefetcher is just `tokio::spawn` or try 
and abstract over async runtime behind a trait



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -660,36 +705,89 @@ where
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         loop {
             match &mut self.state {
-                StreamState::Decoding(batch_reader) => match 
batch_reader.next() {
-                    Some(Ok(batch)) => {
-                        return Poll::Ready(Some(Ok(batch)));
+                StreamState::Decoding(batch_reader) => {
+                    let res: Self::Item = match batch_reader.next() {
+                        Some(Ok(batch)) => Ok(batch),
+                        Some(Err(e)) => {
+                            self.state = StreamState::Error;
+                            return 
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+                        }
+                        None => {
+                            self.state = StreamState::Init;
+                            continue;
+                        }
+                    };
+
+                    if !self.prefetch_row_groups
+                        || self.row_groups.is_empty()
+                        || self.next_reader.is_some()
+                    {
+                        return Poll::Ready(Some(res));
                     }
-                    Some(Err(e)) => {
-                        self.state = StreamState::Error;
-                        return 
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+
+                    let old_state = std::mem::replace(&mut self.state, 
StreamState::Init);
+
+                    let row_group_idx = self.row_groups.pop_front().unwrap(); 
// already checked that row_groups is not empty
+
+                    let fut = self.read_row_group(row_group_idx);
+
+                    if let StreamState::Decoding(batch_reader) = old_state {
+                        self.state = StreamState::Prefetch(batch_reader, fut);
+                        return Poll::Ready(Some(res));
+                    } else {
+                        unreachable!()
                     }
-                    None => self.state = StreamState::Init,
-                },
+                }
+                StreamState::Prefetch(batch_reader, f) => {
+                    let mut noop_cx = 
Context::from_waker(futures::task::noop_waker_ref());
+                    match f.poll_unpin(&mut noop_cx) {

Review Comment:
   When I have tried to think about this problem at various points I get hung 
up at this part. Theoretically polling the next reader here should just be 
driving IO (and hence be cheap). But reading the row group includes both IO and 
evaluating row filters which can involve non-trivial compute and memory since 
we evaluate the row filter over the entire row group in one shot. 
   
   To really implement pre-fetch I think we need to have finer-grained control 
over the pipelining of IO and cpu operations. E.g. prefetch should only do the 
next IO operation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to