haohuaijin commented on code in PR #22940:
URL: https://github.com/apache/datafusion/pull/22940#discussion_r3436612675


##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -169,6 +204,110 @@ impl ParquetAccessPlan {
         }
     }
 
+    /// Create a new `ParquetAccessPlan` from a file-level [`RowSelection`].
+    ///
+    /// The selection is interpreted across all rows in the file, in row group
+    /// order, and is split into row-group level access using 
`row_group_meta_data`.
+    /// Fully skipped row groups become [`RowGroupAccess::Skip`], fully 
selected
+    /// row groups become [`RowGroupAccess::Scan`], and partially selected row
+    /// groups become [`RowGroupAccess::Selection`].
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the selection does not specify exactly the same
+    /// number of rows as the file metadata.
+    pub fn try_new_from_overall_row_selection(
+        selection: RowSelection,
+        row_group_meta_data: &[RowGroupMetaData],
+    ) -> Result<Self> {
+        let selectors: Vec<RowSelector> = selection.into();

Review Comment:
   @kosiew thank you for the suggestion, i restructed the code, can you check 
again.
   
   i do a benchmark for split_off and this pr, i add this bench.rs to 
datafusion-examples/examples/bench.rs
   ```bash
   cargo build --release -p datafusion-examples --example bench
   /usr/bin/time -l target/release/examples/bench split_off heavy 100
   /usr/bin/time -l target/release/examples/bench new heavy 100
   ```
   
   Scenario: `heavy`'s result, run 100 times
   
   | Implementation | Total elapsed | Avg/iteration | Max RSS | Peak footprint |
   |---|---:|---:|---:|---:|
   | `split_off` | ~2055 ms | ~20.55 ms | ~185 MB | ~181 MB |
   | `new` | ~592 ms | ~5.92 ms | ~137 MB | ~133 MB |
   
   <details>
   <summary>benchmark code</summary>
   
   ```rust
   use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, 
RowGroupAccess};
   use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
   use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
   use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
   use std::time::Instant;
   
   const ROWS_PER_RG: usize = 130_000;
   const NUM_ROW_GROUPS: usize = 39;
   const TOTAL_ROWS: usize = ROWS_PER_RG * NUM_ROW_GROUPS;
   type BenchFn = fn(RowSelection, &[RowGroupMetaData]) -> ParquetAccessPlan;
   
   struct OverallRowSelectionCursor {
       selector_iter: std::vec::IntoIter<RowSelector>,
       current: Option<RowSelector>,
   }
   
   impl OverallRowSelectionCursor {
       fn new(selection: RowSelection) -> Self {
           let selectors: Vec<RowSelector> = selection.into();
           let mut selector_iter = selectors.into_iter();
           let current = selector_iter.next();
           Self {
               selector_iter,
               current,
           }
       }
   
       #[inline]
       fn take(&mut self, max_rows: usize) -> Option<RowSelector> {
           let sel = self.current?;
           let row_count = sel.row_count.min(max_rows);
           self.current = if row_count < sel.row_count {
               Some(RowSelector {
                   row_count: sel.row_count - row_count,
                   skip: sel.skip,
               })
           } else {
               self.selector_iter.next()
           };
   
           Some(RowSelector {
               row_count,
               skip: sel.skip,
           })
       }
   
       fn remaining_rows(self) -> usize {
           self.current.map_or(0, |s| s.row_count)
               + self.selector_iter.map(|s| s.row_count).sum::<usize>()
       }
   }
   
   struct RowGroupAccessBuilder {
       selectors: Vec<RowSelector>,
       selected: usize,
       skipped: usize,
       remaining: usize,
   }
   
   impl RowGroupAccessBuilder {
       fn new(row_group_rows: usize) -> Self {
           Self {
               selectors: Vec::with_capacity(1),
               selected: 0,
               skipped: 0,
               remaining: row_group_rows,
           }
       }
   
       #[inline]
       fn push(&mut self, selector: RowSelector) {
           self.remaining -= selector.row_count;
   
           if selector.skip {
               self.skipped += selector.row_count;
           } else {
               self.selected += selector.row_count;
           }
   
           self.selectors.push(selector);
       }
   
       fn into_access(self) -> RowGroupAccess {
           if self.selected == 0 {
               RowGroupAccess::Skip
           } else if self.skipped == 0 {
               RowGroupAccess::Scan
           } else {
               RowGroupAccess::Selection(self.selectors.into())
           }
       }
   }
   
   fn schema_descr() -> SchemaDescPtr {
       use parquet::basic::Type as PhysicalType;
       use parquet::schema::types::Type as SchemaType;
       let field = SchemaType::primitive_type_builder("a", PhysicalType::INT32)
           .build()
           .unwrap();
       let schema = SchemaType::group_type_builder("schema")
           .with_fields(vec![std::sync::Arc::new(field)])
           .build()
           .unwrap();
       std::sync::Arc::new(SchemaDescriptor::new(std::sync::Arc::new(schema)))
   }
   
   fn row_group_metadata() -> Vec<RowGroupMetaData> {
       let descr = schema_descr();
       (0..NUM_ROW_GROUPS)
           .map(|_| {
               let column = ColumnChunkMetaData::builder(descr.column(0))
                   .set_num_values(ROWS_PER_RG as i64)
                   .build()
                   .unwrap();
               RowGroupMetaData::builder(descr.clone())
                   .set_num_rows(ROWS_PER_RG as i64)
                   .set_column_metadata(vec![column])
                   .build()
                   .unwrap()
           })
           .collect()
   }
   
   fn scattered_selection(num_pairs: usize) -> RowSelection {
       let mut selectors: Vec<RowSelector> = Vec::with_capacity(num_pairs * 2);
       let chunk = TOTAL_ROWS / num_pairs;
       let mut acc = 0usize;
       for i in 0..num_pairs {
           let this = if i == num_pairs - 1 {
               TOTAL_ROWS - acc
           } else {
               chunk
           };
           let sel = this.min(1);
           let skp = this - sel;
           if skp > 0 {
               selectors.push(RowSelector::skip(skp));
           }
           if sel > 0 {
               selectors.push(RowSelector::select(sel));
           }
           acc += this;
       }
       RowSelection::from(selectors)
   }
   
   fn new_try_new(
       selection: RowSelection,
       row_group_meta_data: &[RowGroupMetaData],
   ) -> ParquetAccessPlan {
       let mut cursor = OverallRowSelectionCursor::new(selection);
   
       let mut selection_rows = 0usize;
       let mut file_rows = 0usize;
   
       let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
       for rg_meta in row_group_meta_data {
           let rg_rows = rg_meta.num_rows() as usize;
           file_rows += rg_rows;
   
           let mut builder = RowGroupAccessBuilder::new(rg_rows);
           while builder.remaining > 0 {
               let Some(selector) = cursor.take(builder.remaining) else {
                   break;
               };
               selection_rows += selector.row_count;
               builder.push(selector);
           }
   
           row_groups.push(builder.into_access());
       }
   
       selection_rows += cursor.remaining_rows();
   
       assert_eq!(selection_rows, file_rows, "NEW: row count mismatch");
       ParquetAccessPlan::new(row_groups)
   }
   
   fn split_off_try_new(
       selection: RowSelection,
       row_group_meta_data: &[RowGroupMetaData],
   ) -> ParquetAccessPlan {
       let mut remaining_selection = selection;
   
       let mut selection_rows = 0usize;
       let mut file_rows = 0usize;
   
       let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
       for rg_meta in row_group_meta_data {
           let rg_rows = rg_meta.num_rows() as usize;
           file_rows += rg_rows;
   
           let group_selection = remaining_selection.split_off(rg_rows);
           let selected = group_selection.row_count();
           let skipped = group_selection.skipped_row_count();
           selection_rows += selected + skipped;
   
           let access = if selected == 0 {
               RowGroupAccess::Skip
           } else if skipped == 0 {
               RowGroupAccess::Scan
           } else {
               RowGroupAccess::Selection(group_selection)
           };
           row_groups.push(access);
       }
   
       selection_rows +=
           remaining_selection.row_count() + 
remaining_selection.skipped_row_count();
   
       assert_eq!(selection_rows, file_rows, "SPLIT_OFF: row count mismatch");
       ParquetAccessPlan::new(row_groups)
   }
   
   fn run_one(
       name: &str,
       bench_fn: BenchFn,
       label: &str,
       num_pairs: usize,
       iterations: usize,
   ) {
       let meta = row_group_metadata();
       let selection = scattered_selection(num_pairs);
       let s = selection.iter().count();
       let started = Instant::now();
   
       for _ in 0..iterations {
           let plan = bench_fn(selection.clone(), &meta);
           std::hint::black_box(plan);
       }
   
       println!(
           "{name} {label} selectors={s} iterations={iterations} elapsed={:?}",
           started.elapsed()
       );
   }
   
   fn implementation(value: &str) -> Option<(&'static str, BenchFn)> {
       match value {
           "new" => Some(("new", new_try_new)),
           "split_off" => Some(("split_off", split_off_try_new)),
           _ => None,
       }
   }
   
   fn scenario(value: &str) -> Option<(&'static str, usize)> {
       match value {
           "coarse" => Some(("coarse", 40)),
           "medium" => Some(("medium", 50_000)),
           "heavy" => Some(("heavy", 1_000_000)),
           _ => None,
       }
   }
   
   fn usage(program: &str) -> ! {
       eprintln!(
           "Usage: {program} <new|split_off> <coarse|medium|heavy> 
[iterations]\n\
            Example: {program} new heavy 1"
       );
       std::process::exit(2);
   }
   
   fn main() {
       let args = std::env::args().collect::<Vec<_>>();
       let program = args.first().map(String::as_str).unwrap_or("bench");
   
       if !(3..=4).contains(&args.len()) {
           usage(&program);
       }
   
       let Some((name, bench_fn)) = implementation(&args[1]) else {
           usage(&program);
       };
       let Some((label, num_pairs)) = scenario(&args[2]) else {
           usage(&program);
       };
       let iterations = match args.get(3) {
           Some(value) => value.parse().unwrap_or_else(|_| usage(&program)),
           None => 1,
       };
       if iterations == 0 {
           usage(&program);
       }
   
       run_one(name, bench_fn, label, num_pairs, iterations);
   }
   
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to