This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ec24724ea Support page skipping / page_index pushdown for evolved 
schemas (#5268)
ec24724ea is described below

commit ec24724ea1130909402c64d5651a5815dcaf8285
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Feb 15 21:32:28 2023 +0100

    Support page skipping / page_index pushdown for evolved schemas (#5268)
    
    * Make the page index tests clearer about what they are doing
    
    * Support page skipping / page_index pushdown for evolved schemas
    
    * upate test
    
    * Update datafusion/core/src/datasource/file_format/parquet.rs
    
    Co-authored-by: Yang Jiang <[email protected]>
    
    ---------
    
    Co-authored-by: Yang Jiang <[email protected]>
---
 .../core/src/datasource/file_format/parquet.rs     |  77 +++++++-----
 datafusion/core/src/physical_optimizer/pruning.rs  |  25 ++--
 .../core/src/physical_plan/file_format/parquet.rs  | 132 +++++++++++++++++++--
 .../file_format/parquet/page_filter.rs             |  89 +++++++++++---
 4 files changed, 251 insertions(+), 72 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 809be3a7d..0a7a7cadc 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -550,50 +550,63 @@ pub(crate) mod test_util {
     use parquet::file::properties::WriterProperties;
     use tempfile::NamedTempFile;
 
+    /// How many rows per page should be written
+    const ROWS_PER_PAGE: usize = 2;
+
     /// Writes `batches` to a temporary parquet file
     ///
-    /// If multi_page is set to `true`, all batches are written into
-    /// one temporary parquet file and the parquet file is written
+    /// If multi_page is set to `true`, the parquet file(s) are written
     /// with 2 rows per data page (used to test page filtering and
     /// boundaries).
     pub async fn store_parquet(
         batches: Vec<RecordBatch>,
         multi_page: bool,
     ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
-        if multi_page {
-            // All batches write in to one file, each batch must have same 
schema.
-            let mut output = NamedTempFile::new().expect("creating temp file");
-            let mut builder = WriterProperties::builder();
-            builder = builder.set_data_page_row_count_limit(2);
-            let proper = builder.build();
-            let mut writer =
-                ArrowWriter::try_new(&mut output, batches[0].schema(), 
Some(proper))
-                    .expect("creating writer");
-            for b in batches {
-                writer.write(&b).expect("Writing batch");
-            }
-            writer.close().unwrap();
-            Ok((vec![local_unpartitioned_file(&output)], vec![output]))
-        } else {
-            // Each batch writes to their own file
-            let files: Vec<_> = batches
-                .into_iter()
-                .map(|batch| {
-                    let mut output = NamedTempFile::new().expect("creating 
temp file");
+        // Each batch writes to their own file
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let mut output = NamedTempFile::new().expect("creating temp 
file");
+
+                let builder = WriterProperties::builder();
+                let props = if multi_page {
+                    builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
+                } else {
+                    builder
+                }
+                .build();
 
-                    let props = WriterProperties::builder().build();
-                    let mut writer =
-                        ArrowWriter::try_new(&mut output, batch.schema(), 
Some(props))
-                            .expect("creating writer");
+                let mut writer =
+                    ArrowWriter::try_new(&mut output, batch.schema(), 
Some(props))
+                        .expect("creating writer");
 
+                if multi_page {
+                    // write in smaller batches as the parquet writer
+                    // only checks datapage size limits on the boundaries of 
each batch
+                    write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
+                } else {
                     writer.write(&batch).expect("Writing batch");
-                    writer.close().unwrap();
-                    output
-                })
-                .collect();
+                };
+                writer.close().unwrap();
+                output
+            })
+            .collect();
 
-            let meta: Vec<_> = 
files.iter().map(local_unpartitioned_file).collect();
-            Ok((meta, files))
+        let meta: Vec<_> = 
files.iter().map(local_unpartitioned_file).collect();
+        Ok((meta, files))
+    }
+
+    //// write batches chunk_size rows at a time
+    fn write_in_chunks<W: std::io::Write>(
+        writer: &mut ArrowWriter<W>,
+        batch: &RecordBatch,
+        chunk_size: usize,
+    ) {
+        let mut i = 0;
+        while i < batch.num_rows() {
+            let num = chunk_size.min(batch.num_rows() - i);
+            writer.write(&batch.slice(i, num)).unwrap();
+            i += num;
         }
     }
 }
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 1931105cd..f3d363017 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -29,7 +29,7 @@
 //! other source (e.g. a catalog)
 
 use std::convert::TryFrom;
-use std::{collections::HashSet, sync::Arc};
+use std::sync::Arc;
 
 use crate::execution::context::ExecutionProps;
 use crate::prelude::lit;
@@ -233,25 +233,18 @@ impl PruningPredicate {
             .unwrap_or_default()
     }
 
-    /// Returns all need column indexes to evaluate this pruning predicate
-    pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
-        let mut set = HashSet::new();
-        self.required_columns.columns.iter().for_each(|x| {
-            match self.schema().column_with_name(x.0.name.as_str()) {
-                None => {}
-                Some(y) => {
-                    set.insert(y.0);
-                }
-            }
-        });
-        set
+    pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
+        &self.required_columns
     }
 }
 
+/// Records for which columns statistics are necessary to evaluate a
+/// pruning predicate.
+///
 /// Handles creating references to the min/max statistics
 /// for columns as well as recording which statistics are needed
 #[derive(Debug, Default, Clone)]
-struct RequiredStatColumns {
+pub(crate) struct RequiredStatColumns {
     /// The statistics required to evaluate this predicate:
     /// * The unqualified column in the input schema
     /// * Statistics type (e.g. Min or Max or Null_Count)
@@ -267,7 +260,7 @@ impl RequiredStatColumns {
 
     /// Returns an iterator over items in columns (see doc on
     /// `self.columns` for details)
-    fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
+    pub(crate) fn iter(&self) -> impl Iterator<Item = &(Column, 
StatisticsType, Field)> {
         self.columns.iter()
     }
 
@@ -852,7 +845,7 @@ fn build_statistics_expr(expr_builder: &mut 
PruningExpressionBuilder) -> Result<
 }
 
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
-enum StatisticsType {
+pub(crate) enum StatisticsType {
     Min,
     Max,
     NullCount,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5784fd9e1..5bb03b4f4 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -836,8 +836,7 @@ mod tests {
 
     /// round-trip record batches by writing each individual RecordBatch to
     /// a parquet file and then reading that parquet file with the specified
-    /// options. If page_index_predicate is set to `true`, all RecordBatches
-    /// are written into a parquet file instead.
+    /// options.
     #[derive(Debug, Default)]
     struct RoundTrip {
         projection: Option<Vec<usize>>,
@@ -1331,6 +1330,80 @@ mod tests {
         assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
+        let c1: ArrayRef = Arc::new(StringArray::from(vec![
+            // Page 1
+            Some("Foo"),
+            Some("Bar"),
+            // Page 2
+            Some("Foo2"),
+            Some("Bar2"),
+            // Page 3
+            Some("Foo3"),
+            Some("Bar3"),
+        ]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![
+            // Page 1:
+            Some(1),
+            Some(2),
+            // Page 2: (pruned)
+            Some(3),
+            Some(4),
+            // Page 3: (pruned)
+            Some(5),
+            None,
+        ]));
+
+        // batch1: c1(string)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c2(int64)
+        let batch2 = create_batch(vec![("c2", c2.clone())]);
+
+        // batch3 (has c2, c1) -- both columns, should still prune
+        let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", 
c2.clone())]);
+
+        // batch4 (has c2, c1) -- different column order, should still prune
+        let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
+
+        let filter = col("c2").eq(lit(1_i64));
+
+        // read/write them files:
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_page_index_predicate()
+            .round_trip(vec![batch1, batch2, batch3, batch4])
+            .await;
+
+        let expected = vec![
+            "+------+----+",
+            "| c1   | c2 |",
+            "+------+----+",
+            "|      | 1  |",
+            "|      | 2  |",
+            "| Bar  |    |",
+            "| Bar  | 2  |",
+            "| Bar  | 2  |",
+            "| Bar2 |    |",
+            "| Bar3 |    |",
+            "| Foo  |    |",
+            "| Foo  | 1  |",
+            "| Foo  | 1  |",
+            "| Foo2 |    |",
+            "| Foo3 |    |",
+            "+------+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+        let metrics = rt.parquet_exec.metrics().unwrap();
+
+        // There are 4 rows pruned in each of batch2, batch3, and
+        // batch4 for a total of 12. batch1 had no pruning as c2 was
+        // filled in as null
+        assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12);
+    }
+
     #[tokio::test]
     async fn multi_column_predicate_pushdown() {
         let c1: ArrayRef =
@@ -1362,6 +1435,38 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn multi_column_predicate_pushdown_page_index_pushdown() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", 
c2.clone())]);
+
+        // Columns in different order to schema
+        let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
+
+        // read/write them files:
+        let read = RoundTrip::new()
+            .with_predicate(filter)
+            .with_page_index_predicate()
+            .round_trip_to_batches(vec![batch1])
+            .await
+            .unwrap();
+
+        let expected = vec![
+            "+-----+----+",
+            "| c1  | c2 |",
+            "+-----+----+",
+            "|     | 2  |",
+            "| Foo | 1  |",
+            "| bar |    |",
+            "+-----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
     #[tokio::test]
     async fn evolved_schema_incompatible_types() {
         let c1: ArrayRef =
@@ -1635,27 +1740,38 @@ mod tests {
 
     #[tokio::test]
     async fn parquet_page_index_exec_metrics() {
-        let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(2)]));
-        let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), 
Some(5)]));
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            Some(3),
+            Some(4),
+            Some(5),
+        ]));
         let batch1 = create_batch(vec![("int", c1.clone())]);
-        let batch2 = create_batch(vec![("int", c2.clone())]);
 
         let filter = col("int").eq(lit(4_i32));
 
         let rt = RoundTrip::new()
             .with_predicate(filter)
             .with_page_index_predicate()
-            .round_trip(vec![batch1, batch2])
+            .round_trip(vec![batch1])
             .await;
 
         let metrics = rt.parquet_exec.metrics().unwrap();
 
         // assert the batches and some metrics
+        #[rustfmt::skip]
         let expected = vec![
-            "+-----+", "| int |", "+-----+", "| 3   |", "| 4   |", "| 5   |", 
"+-----+",
+            "+-----+",
+            "| int |",
+            "+-----+",
+            "| 4   |",
+            "| 5   |",
+            "+-----+",
         ];
         assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
-        assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
+        assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
         assert!(
             get_value(&metrics, "page_index_eval_time") > 0,
             "no eval time in metrics: {metrics:#?}"
diff --git 
a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs 
b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 3aaad0078..ebe59db9e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -139,6 +139,10 @@ impl PagePruningPredicate {
         let page_index_predicates = &self.predicates;
         let groups = file_metadata.row_groups();
 
+        if groups.is_empty() {
+            return Ok(None);
+        }
+
         let file_offset_indexes = file_metadata.offset_indexes();
         let file_page_indexes = file_metadata.page_indexes();
         let (file_offset_indexes, file_page_indexes) =
@@ -155,30 +159,25 @@ impl PagePruningPredicate {
 
         let mut row_selections = 
Vec::with_capacity(page_index_predicates.len());
         for predicate in page_index_predicates {
-            // `extract_page_index_push_down_predicates` only return predicate 
with one col.
-            //  when building `PruningPredicate`, some single column filter 
like `abs(i) = 1`
-            //  will be rewrite to `lit(true)`, so may have an empty 
required_columns.
-            let col_id =
-                if let Some(&col_id) = 
predicate.need_input_columns_ids().iter().next() {
-                    col_id
-                } else {
-                    continue;
-                };
+            // find column index by looking in the row group metadata.
+            let col_idx = find_column_index(predicate, &groups[0]);
 
             let mut selectors = Vec::with_capacity(row_groups.len());
             for r in row_groups.iter() {
+                let row_group_metadata = &groups[*r];
+
                 let rg_offset_indexes = file_offset_indexes.get(*r);
                 let rg_page_indexes = file_page_indexes.get(*r);
-                if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
-                    (rg_page_indexes, rg_offset_indexes)
+                if let (Some(rg_page_indexes), Some(rg_offset_indexes), 
Some(col_idx)) =
+                    (rg_page_indexes, rg_offset_indexes, col_idx)
                 {
                     selectors.extend(
                         prune_pages_in_one_row_group(
-                            &groups[*r],
+                            row_group_metadata,
                             predicate,
-                            rg_offset_indexes.get(col_id),
-                            rg_page_indexes.get(col_id),
-                            groups[*r].column(col_id).column_descr(),
+                            rg_offset_indexes.get(col_idx),
+                            rg_page_indexes.get(col_idx),
+                            groups[*r].column(col_idx).column_descr(),
                             file_metrics,
                         )
                         .map_err(|e| {
@@ -190,7 +189,7 @@ impl PagePruningPredicate {
                 } else {
                     trace!(
                         "Did not have enough metadata to prune with page 
indexes, \
-                         falling back, falling back to all rows",
+                         falling back to all rows",
                     );
                     // fallback select all rows
                     let all_selected =
@@ -223,6 +222,64 @@ impl PagePruningPredicate {
     }
 }
 
+/// Returns the column index in the row group metadata for the single
+/// column of a single column pruning predicate.
+///
+/// For example, give the predicate `y > 5`
+///
+/// And columns in the RowGroupMetadata like `['x', 'y', 'z']` will
+/// return 1.
+///
+/// Returns `None` if the column is not found, or if there are no
+/// required columns, which is the case for predicate like `abs(i) =
+/// 1` which are rewritten to `lit(true)`
+///
+/// Panics:
+///
+/// If the predicate contains more than one column reference (assumes
+/// that `extract_page_index_push_down_predicates` only return
+/// predicate with one col)
+///
+fn find_column_index(
+    predicate: &PruningPredicate,
+    row_group_metadata: &RowGroupMetaData,
+) -> Option<usize> {
+    let mut found_required_column: Option<&Column> = None;
+
+    for required_column_details in predicate.required_columns().iter() {
+        let column = &required_column_details.0;
+        if let Some(found_required_column) = found_required_column.as_ref() {
+            // make sure it is the same name we have seen previously
+            assert_eq!(
+                column.name, found_required_column.name,
+                "Unexpected multi column predicate"
+            );
+        } else {
+            found_required_column = Some(column);
+        }
+    }
+
+    let column = if let Some(found_required_column) = 
found_required_column.as_ref() {
+        found_required_column
+    } else {
+        trace!("No column references in pruning predicate");
+        return None;
+    };
+
+    let col_idx = row_group_metadata
+        .columns()
+        .iter()
+        .enumerate()
+        .find(|(_idx, c)| c.column_descr().name() == column.name)
+        .map(|(idx, _c)| idx);
+
+    if col_idx.is_none() {
+        trace!("Can not find column {} in row group meta", column.name);
+    }
+
+    col_idx
+}
+
 /// Intersects the [`RowSelector`]s
 ///
 /// For exampe, given:

Reply via email to