Dandandan commented on a change in pull request #365:
URL: https://github.com/apache/arrow-datafusion/pull/365#discussion_r635554027



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -0,0 +1,769 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains code to rule out row groups / partitions /
+//! etc based on statistics prior in order to skip evaluating entire
+//! swaths of rows.
+//!
+//! This code is currently specific to Parquet, but soon (TM), via
+//! https://github.com/apache/arrow-datafusion/issues/363 it will
+//! be genericized.
+
+use std::{collections::HashSet, sync::Arc};
+
+use arrow::{
+    array::{
+        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
+        BooleanBufferBuilder,
+    },
+    buffer::MutableBuffer,
+    datatypes::{DataType, Field, Schema},
+    record_batch::RecordBatch,
+};
+
+use parquet::file::{
+    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
+};
+
+use crate::{
+    error::{DataFusionError, Result},
+    execution::context::ExecutionContextState,
+    logical_plan::{Expr, Operator},
+    optimizer::utils,
+    physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, 
PhysicalExpr},
+};
+
+#[derive(Debug, Clone)]
+/// Builder used for generating predicate functions that can be used
+/// to prune data based on statistics (e.g. parquet row group metadata)
+pub struct PruningPredicateBuilder {
+    schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PruningPredicateBuilder {
+    /// Try to create a new instance of [`PruningPredicateBuilder`]
+    ///
+    /// This will translate the filter expression into a statistics predicate 
expression
+    ///
+    /// For example,  `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 
<= (column_max / 2))`
+    pub fn try_new(expr: &Expr, schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, 
Field)>::new();
+        let logical_predicate_expr =
+            build_predicate_expression(expr, &schema, &mut stat_column_req)?;
+        // println!(
+        //     "PruningPredicateBuilder::try_new, logical_predicate_expr: 
{:?}",
+        //     logical_predicate_expr
+        // );
+        // build physical predicate expression
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let execution_context_state = ExecutionContextState::new();
+        let predicate_expr = 
DefaultPhysicalPlanner::default().create_physical_expr(
+            &logical_predicate_expr,
+            &stat_schema,
+            &execution_context_state,
+        )?;
+        // println!(
+        //     "PruningPredicateBuilder::try_new, predicate_expr: {:?}",
+        //     predicate_expr
+        // );
+        Ok(Self {
+            schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter based on
+    /// statistics
+    ///
+    /// This function takes a slice of statistics as parameter, so
+    /// that DataFusion's physical expressions can be executed once
+    /// against a single RecordBatch, containing statistics arrays, on
+    /// which the physical predicate expression is executed to
+    /// generate a row group filter array.
+    ///
+    /// The generated filter function is then used in the returned
+    /// closure to filter row groups. NOTE this is parquet specific at the 
moment
+    pub fn build_pruning_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_statistics_record_batch(
+            row_group_metadata,
+            &self.schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            // row group filter array could not be built
+            // return a closure which will not filter out any row groups
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = 
predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                // when the result of the predicate expression for a row group 
is null / undefined,
+                // e.g. due to missing statistics, this row group can't be 
filtered out,
+                // so replace with true
+                let predicate_values =
+                    array.iter().map(|x| 
x.unwrap_or(true)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            // return a closure which will not filter out any row groups
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+/// Build a RecordBatch from a list of statistics (currently parquet
+/// [`RowGroupMetadata`] structs), creating arrays, one for each
+/// statistics column, as requested in the stat_column_req parameter.
+fn build_statistics_record_batch(
+    row_groups: &[RowGroupMetaData],
+    schema: &Schema,
+    stat_column_req: &[(String, StatisticsType, Field)],
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                *statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct StatisticsExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    field: &'a Field,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> StatisticsExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not 
supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                return Err(DataFusionError::Plan(
+                    "Field not found in schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            field,
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: Operator) -> Operator {
+        if !self.reverse_operator {
+            return op;
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op,
+        }
+    }
+
+    // fn column_expr(&self) -> &Expr {
+    //     self.column_expr
+    // }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    // fn column_name(&self) -> &String {

Review comment:
       Could be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to