superserious-dev commented on issue #5699:
URL: https://github.com/apache/arrow-rs/issues/5699#issuecomment-3028600080

   Hello, I did some investigation and came up with the following example. It 
looks like it's possible to apply a ProjectionMask targeting the nested column 
`cc` and to access the column in the batch for filtering.
   
   
   ```rust
   use std::{error::Error, fs::File, sync::Arc};
   
   use arrow::array::{Array, AsArray, BooleanArray, Int32Array, RecordBatch, 
Scalar, StructArray};
   use arrow::{
       compute::kernels::cmp::gt,
       datatypes::{DataType, Field, Schema},
       error::ArrowError,
   };
   use parquet::{
       arrow::{
           ArrowWriter, ProjectionMask,
           arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder, 
RowFilter},
       },
       file::metadata::{ParquetMetaData, ParquetMetaDataReader},
   };
   
   fn main() -> Result<(), Box<dyn Error>> {
       let path = "sample.parquet";
   
       // Generate sample data w/ a StructArray
       generate_parquet(path)?;
   
       // Fetch metadata
       let metadata = read_metadata(path)?;
   
       // Construct filter
       let mask = 
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![3]); // 
Only look at cc (leaf = 3) in the filter
       let threshold = Int32Array::from(vec![1]);
       let cc_filter = ArrowPredicateFn::new(mask.clone(), move |batch| {
           // Assert only 1 top-level column is present: "b"
           assert_eq!(batch.num_columns(), 1);
   
           // Downcast the column to a struct
           let cc_struct = batch.column(0).as_struct();
   
           // Assert only 1 struct column is present: "cc"
           assert_eq!(batch.num_columns(), 1);
   
           // Filter column
           let cc = cc_struct.column(0);
           gt(cc, &Scalar::new(&threshold))
       });
   
       // Execute the filter w/ reader mask (same as filter mask)
       let filtered_row_count = filter_parquet(path, cc_filter, mask)?;
   
       // Assert that filtering was successful
       let original_row_count = metadata
           .row_groups()
           .iter()
           .fold(0, |total, rg| total + rg.num_rows());
   
       assert_eq!(original_row_count, 10);
       assert_eq!(filtered_row_count, 3);
   
       Ok(())
   }
   
   fn generate_parquet(path: &str) -> Result<(), Box<dyn Error>> {
       let a_col = Int32Array::from((0..10).collect::<Vec<i32>>());
       let a_field = Field::new("a", DataType::Int32, false);
   
       let b_col = StructArray::from(vec![
           (
               Arc::new(Field::new("aa", DataType::Boolean, false)),
               Arc::new(BooleanArray::from(vec![
                   false, false, true, true, false, false, true, true, false, 
true,
               ])) as Arc<dyn Array>,
           ),
           (
               Arc::new(Field::new("bb", DataType::Boolean, false)),
               Arc::new(BooleanArray::from(vec![
                   true, true, false, false, true, true, false, false, true, 
false,
               ])) as Arc<dyn Array>,
           ),
           (
               Arc::new(Field::new("cc", DataType::Int32, false)),
               Arc::new(Int32Array::from((-5..5).collect::<Vec<i32>>())),
           ),
       ]);
       let b_field = Field::new("b", b_col.data_type().clone(), false);
   
       let c_col = Int32Array::from((10..20).collect::<Vec<i32>>());
       let c_field = Field::new("c", DataType::Int32, true);
   
       let schema = Schema::new(vec![a_field, b_field, c_field]);
       let batch = RecordBatch::try_new(
           Arc::new(schema),
           vec![Arc::new(a_col), Arc::new(b_col), Arc::new(c_col)],
       )?;
   
       let file = File::create(path)?;
       let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
       writer.write(&batch)?;
       writer.close()?;
   
       Ok(())
   }
   
   fn read_metadata(path: &str) -> Result<ParquetMetaData, Box<dyn Error>> {
       let parquet_file = File::open(path)?;
       ParquetMetaDataReader::new()
           .with_page_indexes(true)
           .parse_and_finish(&parquet_file)
           .map_err(|e| e.into())
   }
   
   fn filter_parquet<F>(
       path: &str,
       filter: ArrowPredicateFn<F>,
       mask: ProjectionMask,
   ) -> Result<usize, Box<dyn Error>>
   where
       F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 
'static,
   {
       let parquet_file = File::open(path)?;
   
       let filter = RowFilter::new(vec![Box::new(filter)]);
       let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_file)?
           .with_batch_size(10)
           .with_projection(mask)
           .with_row_filter(filter)
           .build()?;
   
       let mut row_count = 0;
       for b in reader {
           let num_rows = b?.num_rows();
           row_count += num_rows;
       }
   
       Ok(row_count)
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to