HippoBaro commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3142796079
##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
use crate::DecodeResult;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+ RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use bytes::Bytes;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+ NeedThis,
+ SkipThis { remaining_budget: RowBudget },
+ SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+ row_group_idx: usize,
+ row_count: usize,
+ selection: Option<RowSelection>,
+ budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+ selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+ fn new(selection: Option<RowSelection>) -> Self {
+ Self { selection }
+ }
+
+ fn is_exhausted(&self) -> bool {
+ self.selection
+ .as_ref()
+ .is_some_and(|selection| !selection.selects_any())
+ }
+
+ fn clear(&mut self) {
+ self.selection = None;
+ }
+
+ fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection>
{
+ self.selection.as_mut().map(|s| s.split_off(row_count))
+ }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+ parquet_metadata: Arc<ParquetMetaData>,
+ row_groups: VecDeque<usize>,
+ selection: SelectionFrontier,
+ budget: RowBudget,
+ has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+ fn new(
+ parquet_metadata: Arc<ParquetMetaData>,
+ row_groups: Vec<usize>,
+ selection: Option<RowSelection>,
+ budget: RowBudget,
+ has_predicates: bool,
+ ) -> Self {
+ Self {
+ parquet_metadata,
+ row_groups: VecDeque::from(row_groups),
+ selection: SelectionFrontier::new(selection),
+ budget,
+ has_predicates,
+ }
+ }
+
+ fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize,
ParquetError> {
+ self.parquet_metadata
+ .row_group(row_group_idx)
+ .num_rows()
+ .try_into()
+ .map_err(|e| ParquetError::General(format!("Row count overflow:
{e}")))
+ }
+
+ fn advance_budget(&mut self, budget: RowBudget) {
+ self.budget = budget;
+ }
+
+ fn selection_exhausted(&self) -> bool {
+ self.selection.is_exhausted()
+ }
+
+ fn clear_remaining(&mut self) {
+ self.selection.clear();
+ self.row_groups.clear();
+ }
+
+ fn classify_queued_row_group(
+ &self,
+ row_count: usize,
+ selection: Option<&RowSelection>,
+ ) -> QueuedRowGroupDecision {
+ if self.budget.is_exhausted() {
+ return QueuedRowGroupDecision::SkipAllRemaining;
+ }
+
+ if selection.is_some_and(|selection| !selection.selects_any()) {
+ return QueuedRowGroupDecision::SkipThis {
+ remaining_budget: self.budget,
+ };
+ }
+
+ if self.has_predicates {
+ return QueuedRowGroupDecision::NeedThis;
+ }
+
+ let rows_before_budget = selection
+ .map(|selection| selection.row_count())
+ .unwrap_or(row_count);
+ let rows_after_budget = self.budget.rows_after(rows_before_budget);
+ if rows_after_budget != 0 {
+ return QueuedRowGroupDecision::NeedThis;
+ }
+
+ QueuedRowGroupDecision::SkipThis {
+ remaining_budget: self.budget.advance(rows_before_budget,
rows_after_budget),
+ }
+ }
+
+ fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>,
ParquetError> {
+ loop {
+ if self.selection_exhausted() {
+ self.clear_remaining();
+ return Ok(None);
+ }
+
+ let Some(&row_group_idx) = self.row_groups.front() else {
+ return Ok(None);
+ };
+ let row_count = self.row_group_num_rows(row_group_idx)?;
+ let selection = self.selection.take_for_row_group(row_count);
+
+ match self.classify_queued_row_group(row_count,
selection.as_ref()) {
Review Comment:
Agreed. I reworked this so next_readable_row_group is now the single loop
that advances queued row groups until it either finds work or determines there
is no more work.
The code is now much nicer 🙇
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]