This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ec24724ea Support page skipping / page_index pushdown for evolved
schemas (#5268)
ec24724ea is described below
commit ec24724ea1130909402c64d5651a5815dcaf8285
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Feb 15 21:32:28 2023 +0100
Support page skipping / page_index pushdown for evolved schemas (#5268)
* Make the page index tests clearer about what they are doing
* Support page skipping / page_index pushdown for evolved schemas
* upate test
* Update datafusion/core/src/datasource/file_format/parquet.rs
Co-authored-by: Yang Jiang <[email protected]>
---------
Co-authored-by: Yang Jiang <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 77 +++++++-----
datafusion/core/src/physical_optimizer/pruning.rs | 25 ++--
.../core/src/physical_plan/file_format/parquet.rs | 132 +++++++++++++++++++--
.../file_format/parquet/page_filter.rs | 89 +++++++++++---
4 files changed, 251 insertions(+), 72 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 809be3a7d..0a7a7cadc 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -550,50 +550,63 @@ pub(crate) mod test_util {
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;
+ /// How many rows per page should be written
+ const ROWS_PER_PAGE: usize = 2;
+
/// Writes `batches` to a temporary parquet file
///
- /// If multi_page is set to `true`, all batches are written into
- /// one temporary parquet file and the parquet file is written
+ /// If multi_page is set to `true`, the parquet file(s) are written
/// with 2 rows per data page (used to test page filtering and
/// boundaries).
pub async fn store_parquet(
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
- if multi_page {
- // All batches write in to one file, each batch must have same
schema.
- let mut output = NamedTempFile::new().expect("creating temp file");
- let mut builder = WriterProperties::builder();
- builder = builder.set_data_page_row_count_limit(2);
- let proper = builder.build();
- let mut writer =
- ArrowWriter::try_new(&mut output, batches[0].schema(),
Some(proper))
- .expect("creating writer");
- for b in batches {
- writer.write(&b).expect("Writing batch");
- }
- writer.close().unwrap();
- Ok((vec![local_unpartitioned_file(&output)], vec![output]))
- } else {
- // Each batch writes to their own file
- let files: Vec<_> = batches
- .into_iter()
- .map(|batch| {
- let mut output = NamedTempFile::new().expect("creating
temp file");
+ // Each batch writes to their own file
+ let files: Vec<_> = batches
+ .into_iter()
+ .map(|batch| {
+ let mut output = NamedTempFile::new().expect("creating temp
file");
+
+ let builder = WriterProperties::builder();
+ let props = if multi_page {
+ builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
+ } else {
+ builder
+ }
+ .build();
- let props = WriterProperties::builder().build();
- let mut writer =
- ArrowWriter::try_new(&mut output, batch.schema(),
Some(props))
- .expect("creating writer");
+ let mut writer =
+ ArrowWriter::try_new(&mut output, batch.schema(),
Some(props))
+ .expect("creating writer");
+ if multi_page {
+ // write in smaller batches as the parquet writer
+ // only checks datapage size limits on the boundaries of
each batch
+ write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
+ } else {
writer.write(&batch).expect("Writing batch");
- writer.close().unwrap();
- output
- })
- .collect();
+ };
+ writer.close().unwrap();
+ output
+ })
+ .collect();
- let meta: Vec<_> =
files.iter().map(local_unpartitioned_file).collect();
- Ok((meta, files))
+ let meta: Vec<_> =
files.iter().map(local_unpartitioned_file).collect();
+ Ok((meta, files))
+ }
+
+ //// write batches chunk_size rows at a time
+ fn write_in_chunks<W: std::io::Write>(
+ writer: &mut ArrowWriter<W>,
+ batch: &RecordBatch,
+ chunk_size: usize,
+ ) {
+ let mut i = 0;
+ while i < batch.num_rows() {
+ let num = chunk_size.min(batch.num_rows() - i);
+ writer.write(&batch.slice(i, num)).unwrap();
+ i += num;
}
}
}
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 1931105cd..f3d363017 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -29,7 +29,7 @@
//! other source (e.g. a catalog)
use std::convert::TryFrom;
-use std::{collections::HashSet, sync::Arc};
+use std::sync::Arc;
use crate::execution::context::ExecutionProps;
use crate::prelude::lit;
@@ -233,25 +233,18 @@ impl PruningPredicate {
.unwrap_or_default()
}
- /// Returns all need column indexes to evaluate this pruning predicate
- pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
- let mut set = HashSet::new();
- self.required_columns.columns.iter().for_each(|x| {
- match self.schema().column_with_name(x.0.name.as_str()) {
- None => {}
- Some(y) => {
- set.insert(y.0);
- }
- }
- });
- set
+ pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
+ &self.required_columns
}
}
+/// Records for which columns statistics are necessary to evaluate a
+/// pruning predicate.
+///
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
#[derive(Debug, Default, Clone)]
-struct RequiredStatColumns {
+pub(crate) struct RequiredStatColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max or Null_Count)
@@ -267,7 +260,7 @@ impl RequiredStatColumns {
/// Returns an iterator over items in columns (see doc on
/// `self.columns` for details)
- fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
+ pub(crate) fn iter(&self) -> impl Iterator<Item = &(Column,
StatisticsType, Field)> {
self.columns.iter()
}
@@ -852,7 +845,7 @@ fn build_statistics_expr(expr_builder: &mut
PruningExpressionBuilder) -> Result<
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-enum StatisticsType {
+pub(crate) enum StatisticsType {
Min,
Max,
NullCount,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5784fd9e1..5bb03b4f4 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -836,8 +836,7 @@ mod tests {
/// round-trip record batches by writing each individual RecordBatch to
/// a parquet file and then reading that parquet file with the specified
- /// options. If page_index_predicate is set to `true`, all RecordBatches
- /// are written into a parquet file instead.
+ /// options.
#[derive(Debug, Default)]
struct RoundTrip {
projection: Option<Vec<usize>>,
@@ -1331,6 +1330,80 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}
+ #[tokio::test]
+ async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
+ let c1: ArrayRef = Arc::new(StringArray::from(vec![
+ // Page 1
+ Some("Foo"),
+ Some("Bar"),
+ // Page 2
+ Some("Foo2"),
+ Some("Bar2"),
+ // Page 3
+ Some("Foo3"),
+ Some("Bar3"),
+ ]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![
+ // Page 1:
+ Some(1),
+ Some(2),
+ // Page 2: (pruned)
+ Some(3),
+ Some(4),
+ // Page 3: (pruned)
+ Some(5),
+ None,
+ ]));
+
+ // batch1: c1(string)
+ let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+ // batch2: c2(int64)
+ let batch2 = create_batch(vec![("c2", c2.clone())]);
+
+ // batch3 (has c2, c1) -- both columns, should still prune
+ let batch3 = create_batch(vec![("c1", c1.clone()), ("c2",
c2.clone())]);
+
+ // batch4 (has c2, c1) -- different column order, should still prune
+ let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
+
+ let filter = col("c2").eq(lit(1_i64));
+
+ // read/write them files:
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_page_index_predicate()
+ .round_trip(vec![batch1, batch2, batch3, batch4])
+ .await;
+
+ let expected = vec![
+ "+------+----+",
+ "| c1 | c2 |",
+ "+------+----+",
+ "| | 1 |",
+ "| | 2 |",
+ "| Bar | |",
+ "| Bar | 2 |",
+ "| Bar | 2 |",
+ "| Bar2 | |",
+ "| Bar3 | |",
+ "| Foo | |",
+ "| Foo | 1 |",
+ "| Foo | 1 |",
+ "| Foo2 | |",
+ "| Foo3 | |",
+ "+------+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+ let metrics = rt.parquet_exec.metrics().unwrap();
+
+ // There are 4 rows pruned in each of batch2, batch3, and
+ // batch4 for a total of 12. batch1 had no pruning as c2 was
+ // filled in as null
+ assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12);
+ }
+
#[tokio::test]
async fn multi_column_predicate_pushdown() {
let c1: ArrayRef =
@@ -1362,6 +1435,38 @@ mod tests {
assert_batches_sorted_eq!(expected, &read);
}
+ #[tokio::test]
+ async fn multi_column_predicate_pushdown_page_index_pushdown() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let batch1 = create_batch(vec![("c1", c1.clone()), ("c2",
c2.clone())]);
+
+ // Columns in different order to schema
+ let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
+
+ // read/write them files:
+ let read = RoundTrip::new()
+ .with_predicate(filter)
+ .with_page_index_predicate()
+ .round_trip_to_batches(vec![batch1])
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+-----+----+",
+ "| c1 | c2 |",
+ "+-----+----+",
+ "| | 2 |",
+ "| Foo | 1 |",
+ "| bar | |",
+ "+-----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
@@ -1635,27 +1740,38 @@ mod tests {
#[tokio::test]
async fn parquet_page_index_exec_metrics() {
- let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(2)]));
- let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4),
Some(5)]));
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ Some(3),
+ Some(4),
+ Some(5),
+ ]));
let batch1 = create_batch(vec![("int", c1.clone())]);
- let batch2 = create_batch(vec![("int", c2.clone())]);
let filter = col("int").eq(lit(4_i32));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
- .round_trip(vec![batch1, batch2])
+ .round_trip(vec![batch1])
.await;
let metrics = rt.parquet_exec.metrics().unwrap();
// assert the batches and some metrics
+ #[rustfmt::skip]
let expected = vec![
- "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |",
"+-----+",
+ "+-----+",
+ "| int |",
+ "+-----+",
+ "| 4 |",
+ "| 5 |",
+ "+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
- assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
+ assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 3aaad0078..ebe59db9e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -139,6 +139,10 @@ impl PagePruningPredicate {
let page_index_predicates = &self.predicates;
let groups = file_metadata.row_groups();
+ if groups.is_empty() {
+ return Ok(None);
+ }
+
let file_offset_indexes = file_metadata.offset_indexes();
let file_page_indexes = file_metadata.page_indexes();
let (file_offset_indexes, file_page_indexes) =
@@ -155,30 +159,25 @@ impl PagePruningPredicate {
let mut row_selections =
Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
- // `extract_page_index_push_down_predicates` only return predicate
with one col.
- // when building `PruningPredicate`, some single column filter
like `abs(i) = 1`
- // will be rewrite to `lit(true)`, so may have an empty
required_columns.
- let col_id =
- if let Some(&col_id) =
predicate.need_input_columns_ids().iter().next() {
- col_id
- } else {
- continue;
- };
+ // find column index by looking in the row group metadata.
+ let col_idx = find_column_index(predicate, &groups[0]);
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
+ let row_group_metadata = &groups[*r];
+
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
- if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
- (rg_page_indexes, rg_offset_indexes)
+ if let (Some(rg_page_indexes), Some(rg_offset_indexes),
Some(col_idx)) =
+ (rg_page_indexes, rg_offset_indexes, col_idx)
{
selectors.extend(
prune_pages_in_one_row_group(
- &groups[*r],
+ row_group_metadata,
predicate,
- rg_offset_indexes.get(col_id),
- rg_page_indexes.get(col_id),
- groups[*r].column(col_id).column_descr(),
+ rg_offset_indexes.get(col_idx),
+ rg_page_indexes.get(col_idx),
+ groups[*r].column(col_idx).column_descr(),
file_metrics,
)
.map_err(|e| {
@@ -190,7 +189,7 @@ impl PagePruningPredicate {
} else {
trace!(
"Did not have enough metadata to prune with page
indexes, \
- falling back, falling back to all rows",
+ falling back to all rows",
);
// fallback select all rows
let all_selected =
@@ -223,6 +222,64 @@ impl PagePruningPredicate {
}
}
+/// Returns the column index in the row group metadata for the single
+/// column of a single column pruning predicate.
+///
+/// For example, give the predicate `y > 5`
+///
+/// And columns in the RowGroupMetadata like `['x', 'y', 'z']` will
+/// return 1.
+///
+/// Returns `None` if the column is not found, or if there are no
+/// required columns, which is the case for predicate like `abs(i) =
+/// 1` which are rewritten to `lit(true)`
+///
+/// Panics:
+///
+/// If the predicate contains more than one column reference (assumes
+/// that `extract_page_index_push_down_predicates` only return
+/// predicate with one col)
+///
+fn find_column_index(
+ predicate: &PruningPredicate,
+ row_group_metadata: &RowGroupMetaData,
+) -> Option<usize> {
+ let mut found_required_column: Option<&Column> = None;
+
+ for required_column_details in predicate.required_columns().iter() {
+ let column = &required_column_details.0;
+ if let Some(found_required_column) = found_required_column.as_ref() {
+ // make sure it is the same name we have seen previously
+ assert_eq!(
+ column.name, found_required_column.name,
+ "Unexpected multi column predicate"
+ );
+ } else {
+ found_required_column = Some(column);
+ }
+ }
+
+ let column = if let Some(found_required_column) =
found_required_column.as_ref() {
+ found_required_column
+ } else {
+ trace!("No column references in pruning predicate");
+ return None;
+ };
+
+ let col_idx = row_group_metadata
+ .columns()
+ .iter()
+ .enumerate()
+ .find(|(_idx, c)| c.column_descr().name() == column.name)
+ .map(|(idx, _c)| idx);
+
+ if col_idx.is_none() {
+ trace!("Can not find column {} in row group meta", column.name);
+ }
+
+ col_idx
+}
+
/// Intersects the [`RowSelector`]s
///
/// For exampe, given: