This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c86db0b2c Avoid some copies, encapsulate the handling of child 
indicies in `OptimizeProjection` (#10216)
5c86db0b2c is described below

commit 5c86db0b2c5a459d54994b7c6ce6b461cc35d767
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Apr 25 15:24:09 2024 -0400

    Avoid some copies, encapsulate the handling of child indicies in 
`OptimizeProjection` (#10216)
---
 datafusion/common/src/dfschema.rs                  |  17 +-
 .../mod.rs}                                        | 304 +++++----------------
 .../src/optimize_projections/required_indices.rs   | 227 +++++++++++++++
 3 files changed, 311 insertions(+), 237 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index f1909f0dc8..64e40ea99e 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -347,9 +347,22 @@ impl DFSchema {
         matches.next()
     }
 
-    /// Find the index of the column with the given qualifier and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+    /// Find the index of the column with the given qualifier and name,
+    /// returning `None` if not found
+    ///
+    /// See [Self::index_of_column] for a version that returns an error if the
+    /// column is not found
+    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
         self.index_of_column_by_name(col.relation.as_ref(), &col.name)
+    }
+
+    /// Find the index of the column with the given qualifier and name,
+    /// returning `Err` if not found
+    ///
+    /// See [Self::maybe_index_of_column] for a version that returns `None` if
+    /// the column is not found
+    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+        self.maybe_index_of_column(col)
             .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, 
self))
     }
 
diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
similarity index 87%
rename from datafusion/optimizer/src/optimize_projections.rs
rename to datafusion/optimizer/src/optimize_projections/mod.rs
index 70ffd8f244..0f2aaa6cbc 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -17,16 +17,16 @@
 
 //! [`OptimizeProjections`] identifies and eliminates unused columns
 
+mod required_indices;
+
 use std::collections::HashSet;
 use std::sync::Arc;
 
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
 
-use arrow::datatypes::SchemaRef;
 use datafusion_common::{
-    get_required_group_by_exprs_indices, internal_err, Column, DFSchema, 
DFSchemaRef,
-    JoinType, Result,
+    get_required_group_by_exprs_indices, internal_err, Column, JoinType, 
Result,
 };
 use datafusion_expr::expr::{Alias, ScalarFunction};
 use datafusion_expr::{
@@ -34,9 +34,10 @@ use datafusion_expr::{
     Expr, Projection, TableScan, Window,
 };
 
+use crate::optimize_projections::required_indices::RequiredIndicies;
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
 use hashbrown::HashMap;
-use itertools::{izip, Itertools};
+use itertools::izip;
 
 /// Optimizer rule to prune unnecessary columns from intermediate schemas
 /// inside the [`LogicalPlan`]. This rule:
@@ -70,8 +71,8 @@ impl OptimizerRule for OptimizeProjections {
         config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // All output fields are necessary:
-        let indices = (0..plan.schema().fields().len()).collect::<Vec<_>>();
-        optimize_projections(plan, config, &indices)
+        let indices = RequiredIndicies::new_for_all_exprs(plan);
+        optimize_projections(plan, config, indices)
     }
 
     fn name(&self) -> &str {
@@ -105,13 +106,9 @@ impl OptimizerRule for OptimizeProjections {
 fn optimize_projections(
     plan: &LogicalPlan,
     config: &dyn OptimizerConfig,
-    indices: &[usize],
+    indices: RequiredIndicies,
 ) -> Result<Option<LogicalPlan>> {
-    // `child_required_indices` stores
-    // - indices of the columns required for each child
-    // - a flag indicating whether putting a projection above children is 
beneficial for the parent.
-    // As an example LogicalPlan::Filter benefits from small tables. Hence for 
filter child this flag would be `true`.
-    let child_required_indices: Vec<(Vec<usize>, bool)> = match plan {
+    let child_required_indices: Vec<RequiredIndicies> = match plan {
         LogicalPlan::Sort(_)
         | LogicalPlan::Filter(_)
         | LogicalPlan::Repartition(_)
@@ -123,12 +120,13 @@ fn optimize_projections(
             // that appear in this plan's expressions to its child. All these
             // operators benefit from "small" inputs, so the 
projection_beneficial
             // flag is `true`.
-            let exprs = plan.expressions();
             plan.inputs()
                 .into_iter()
                 .map(|input| {
-                    get_all_required_indices(indices, input, exprs.iter())
-                        .map(|idxs| (idxs, true))
+                    indices
+                        .clone()
+                        .with_projection_beneficial()
+                        .with_plan_exprs(plan, input.schema())
                 })
                 .collect::<Result<_>>()?
         }
@@ -137,13 +135,9 @@ fn optimize_projections(
             // that appear in this plan's expressions to its child. These 
operators
             // do not benefit from "small" inputs, so the projection_beneficial
             // flag is `false`.
-            let exprs = plan.expressions();
             plan.inputs()
                 .into_iter()
-                .map(|input| {
-                    get_all_required_indices(indices, input, exprs.iter())
-                        .map(|idxs| (idxs, false))
-                })
+                .map(|input| indices.clone().with_plan_exprs(plan, 
input.schema()))
                 .collect::<Result<_>>()?
         }
         LogicalPlan::Copy(_)
@@ -159,16 +153,14 @@ fn optimize_projections(
             // TODO: For some subquery variants (e.g. a subquery arising from 
an
             //       EXISTS expression), we may not need to require all 
indices.
             plan.inputs()
-                .iter()
-                .map(|input| 
((0..input.schema().fields().len()).collect_vec(), false))
-                .collect::<Vec<_>>()
+                .into_iter()
+                .map(RequiredIndicies::new_for_all_exprs)
+                .collect()
         }
         LogicalPlan::Extension(extension) => {
-            let necessary_children_indices = if let 
Some(necessary_children_indices) =
-                extension.node.necessary_children_exprs(indices)
-            {
-                necessary_children_indices
-            } else {
+            let Some(necessary_children_indices) =
+                extension.node.necessary_children_exprs(indices.indices())
+            else {
                 // Requirements from parent cannot be routed down to user 
defined logical plan safely
                 return Ok(None);
             };
@@ -178,16 +170,12 @@ fn optimize_projections(
                 Make sure `.necessary_children_exprs` implementation of the 
`UserDefinedLogicalNode` is \
                 consistent with actual children length for the node.");
             }
-            // Expressions used by node.
-            let exprs = plan.expressions();
             children
                 .into_iter()
                 .zip(necessary_children_indices)
                 .map(|(child, necessary_indices)| {
-                    let child_schema = child.schema();
-                    let child_req_indices =
-                        indices_referred_by_exprs(child_schema, exprs.iter())?;
-                    Ok((merge_slices(&necessary_indices, &child_req_indices), 
false))
+                    RequiredIndicies::new_from_indices(necessary_indices)
+                        .with_plan_exprs(plan, child.schema())
                 })
                 .collect::<Result<Vec<_>>>()?
         }
@@ -213,13 +201,9 @@ fn optimize_projections(
         LogicalPlan::Aggregate(aggregate) => {
             // Split parent requirements to GROUP BY and aggregate sections:
             let n_group_exprs = aggregate.group_expr_len()?;
-            let (group_by_reqs, mut aggregate_reqs): (Vec<usize>, Vec<usize>) =
-                indices.iter().partition(|&&idx| idx < n_group_exprs);
             // Offset aggregate indices so that they point to valid indices at
             // `aggregate.aggr_expr`:
-            for idx in aggregate_reqs.iter_mut() {
-                *idx -= n_group_exprs;
-            }
+            let (group_by_reqs, aggregate_reqs) = 
indices.split_off(n_group_exprs);
 
             // Get absolutely necessary GROUP BY fields:
             let group_by_expr_existing = aggregate
@@ -235,16 +219,16 @@ fn optimize_projections(
                 // Some of the fields in the GROUP BY may be required by the
                 // parent even if these fields are unnecessary in terms of
                 // functional dependency.
-                let required_indices =
-                    merge_slices(&simplest_groupby_indices, &group_by_reqs);
-                get_at_indices(&aggregate.group_expr, &required_indices)
+                group_by_reqs
+                    .append(&simplest_groupby_indices)
+                    .get_at_indices(&aggregate.group_expr)
             } else {
                 aggregate.group_expr.clone()
             };
 
             // Only use the absolutely necessary aggregate expressions required
             // by the parent:
-            let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr, 
&aggregate_reqs);
+            let mut new_aggr_expr = 
aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
 
             // Aggregations always need at least one aggregate expression.
             // With a nested count, we don't require any column as input, but
@@ -263,10 +247,12 @@ fn optimize_projections(
 
             let all_exprs_iter = 
new_group_bys.iter().chain(new_aggr_expr.iter());
             let schema = aggregate.input.schema();
-            let necessary_indices = indices_referred_by_exprs(schema, 
all_exprs_iter)?;
+            let necessary_indices =
+                RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
+            let necessary_exprs = necessary_indices.get_required_exprs(schema);
 
             let aggregate_input = if let Some(input) =
-                optimize_projections(&aggregate.input, config, 
&necessary_indices)?
+                optimize_projections(&aggregate.input, config, 
necessary_indices)?
             {
                 input
             } else {
@@ -277,7 +263,6 @@ fn optimize_projections(
             // that its input only contains absolutely necessary columns for
             // the aggregate expressions. Note that necessary_indices refer to
             // fields in `aggregate.input.schema()`.
-            let necessary_exprs = get_required_exprs(schema, 
&necessary_indices);
             let (aggregate_input, _) =
                 add_projection_on_top_if_helpful(aggregate_input, 
necessary_exprs)?;
 
@@ -291,29 +276,24 @@ fn optimize_projections(
             .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate)));
         }
         LogicalPlan::Window(window) => {
+            let input_schema = window.input.schema();
             // Split parent requirements to child and window expression 
sections:
-            let n_input_fields = window.input.schema().fields().len();
-            let (child_reqs, mut window_reqs): (Vec<usize>, Vec<usize>) =
-                indices.iter().partition(|&&idx| idx < n_input_fields);
+            let n_input_fields = input_schema.fields().len();
             // Offset window expression indices so that they point to valid
             // indices at `window.window_expr`:
-            for idx in window_reqs.iter_mut() {
-                *idx -= n_input_fields;
-            }
+            let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
 
             // Only use window expressions that are absolutely necessary 
according
             // to parent requirements:
-            let new_window_expr = get_at_indices(&window.window_expr, 
&window_reqs);
+            let new_window_expr = 
window_reqs.get_at_indices(&window.window_expr);
 
             // Get all the required column indices at the input, either by the
             // parent or window expression requirements.
-            let required_indices = get_all_required_indices(
-                &child_reqs,
-                &window.input,
-                new_window_expr.iter(),
-            )?;
+            let required_indices =
+                child_reqs.with_exprs(input_schema, &new_window_expr)?;
+
             let window_child = if let Some(new_window_child) =
-                optimize_projections(&window.input, config, &required_indices)?
+                optimize_projections(&window.input, config, 
required_indices.clone())?
             {
                 new_window_child
             } else {
@@ -327,8 +307,7 @@ fn optimize_projections(
                 // Calculate required expressions at the input of the window.
                 // Please note that we use `old_child`, because 
`required_indices`
                 // refers to `old_child`.
-                let required_exprs =
-                    get_required_exprs(window.input.schema(), 
&required_indices);
+                let required_exprs = 
required_indices.get_required_exprs(input_schema);
                 let (window_child, _) =
                     add_projection_on_top_if_helpful(window_child, 
required_exprs)?;
                 Window::try_new(new_window_expr, Arc::new(window_child))
@@ -339,31 +318,35 @@ fn optimize_projections(
             let left_len = join.left.schema().fields().len();
             let (left_req_indices, right_req_indices) =
                 split_join_requirements(left_len, indices, &join.join_type);
-            let exprs = plan.expressions();
             let left_indices =
-                get_all_required_indices(&left_req_indices, &join.left, 
exprs.iter())?;
+                left_req_indices.with_plan_exprs(plan, join.left.schema())?;
             let right_indices =
-                get_all_required_indices(&right_req_indices, &join.right, 
exprs.iter())?;
+                right_req_indices.with_plan_exprs(plan, join.right.schema())?;
             // Joins benefit from "small" input tables (lower memory usage).
             // Therefore, each child benefits from projection:
-            vec![(left_indices, true), (right_indices, true)]
+            vec![
+                left_indices.with_projection_beneficial(),
+                right_indices.with_projection_beneficial(),
+            ]
         }
         LogicalPlan::CrossJoin(cross_join) => {
             let left_len = cross_join.left.schema().fields().len();
-            let (left_child_indices, right_child_indices) =
+            let (left_indices, right_indices) =
                 split_join_requirements(left_len, indices, &JoinType::Inner);
             // Joins benefit from "small" input tables (lower memory usage).
             // Therefore, each child benefits from projection:
-            vec![(left_child_indices, true), (right_child_indices, true)]
+            vec![
+                left_indices.with_projection_beneficial(),
+                right_indices.with_projection_beneficial(),
+            ]
         }
         LogicalPlan::TableScan(table_scan) => {
-            let schema = table_scan.source.schema();
             // Get indices referred to in the original (schema with all fields)
             // given projected indices.
-            let projection = with_indices(&table_scan.projection, schema, 
|map| {
-                indices.iter().map(|&idx| map[idx]).collect()
-            });
-
+            let projection = match &table_scan.projection {
+                Some(projection) => indices.into_mapped_indices(|idx| 
projection[idx]),
+                None => indices.into_inner(),
+            };
             return TableScan::try_new(
                 table_scan.table_name.clone(),
                 table_scan.source.clone(),
@@ -376,15 +359,16 @@ fn optimize_projections(
     };
 
     let new_inputs = izip!(child_required_indices, plan.inputs().into_iter())
-        .map(|((required_indices, projection_beneficial), child)| {
+        .map(|(required_indices, child)| {
+            let projection_beneficial = 
required_indices.projection_beneficial();
+            let project_exprs = 
required_indices.get_required_exprs(child.schema());
             let (input, is_changed) = if let Some(new_input) =
-                optimize_projections(child, config, &required_indices)?
+                optimize_projections(child, config, required_indices)?
             {
                 (new_input, true)
             } else {
                 (child.clone(), false)
             };
-            let project_exprs = get_required_exprs(child.schema(), 
&required_indices);
             let (input, proj_added) = if projection_beneficial {
                 add_projection_on_top_if_helpful(input, project_exprs)?
             } else {
@@ -408,26 +392,6 @@ fn optimize_projections(
     }
 }
 
-/// This function applies the given function `f` to the projection indices
-/// `proj_indices` if they exist. Otherwise, applies `f` to a default set
-/// of indices according to `schema`.
-fn with_indices<F>(
-    proj_indices: &Option<Vec<usize>>,
-    schema: SchemaRef,
-    mut f: F,
-) -> Vec<usize>
-where
-    F: FnMut(&[usize]) -> Vec<usize>,
-{
-    match proj_indices {
-        Some(indices) => f(indices.as_slice()),
-        None => {
-            let range: Vec<usize> = (0..schema.fields.len()).collect();
-            f(range.as_slice())
-        }
-    }
-}
-
 /// Merges consecutive projections.
 ///
 /// Given a projection `proj`, this function attempts to merge it with a 
previous
@@ -653,132 +617,6 @@ fn outer_columns_helper_multi<'a>(
     exprs.into_iter().for_each(|e| outer_columns(e, columns));
 }
 
-/// Generates the required expressions (columns) that reside at `indices` of
-/// the given `input_schema`.
-///
-/// # Arguments
-///
-/// * `input_schema` - A reference to the input schema.
-/// * `indices` - A slice of `usize` indices specifying required columns.
-///
-/// # Returns
-///
-/// A vector of `Expr::Column` expressions residing at `indices` of the 
`input_schema`.
-fn get_required_exprs(input_schema: &Arc<DFSchema>, indices: &[usize]) -> 
Vec<Expr> {
-    indices
-        .iter()
-        .map(|&idx| 
Expr::Column(Column::from(input_schema.qualified_field(idx))))
-        .collect()
-}
-
-/// Get indices of the fields referred to by any expression in `exprs` within
-/// the given schema (`input_schema`).
-///
-/// # Arguments
-///
-/// * `input_schema`: The input schema to analyze for index requirements.
-/// * `exprs`: An iterator of expressions for which we want to find necessary
-///   field indices.
-///
-/// # Returns
-///
-/// A [`Result`] object containing the indices of all required fields in
-/// `input_schema` to calculate all `exprs` successfully.
-fn indices_referred_by_exprs<'a>(
-    input_schema: &DFSchemaRef,
-    exprs: impl Iterator<Item = &'a Expr>,
-) -> Result<Vec<usize>> {
-    let indices = exprs
-        .map(|expr| indices_referred_by_expr(input_schema, expr))
-        .collect::<Result<Vec<_>>>()?;
-    Ok(indices
-        .into_iter()
-        .flatten()
-        // Make sure no duplicate entries exist and indices are ordered:
-        .sorted()
-        .dedup()
-        .collect())
-}
-
-/// Get indices of the fields referred to by the given expression `expr` within
-/// the given schema (`input_schema`).
-///
-/// # Parameters
-///
-/// * `input_schema`: The input schema to analyze for index requirements.
-/// * `expr`: An expression for which we want to find necessary field indices.
-///
-/// # Returns
-///
-/// A [`Result`] object containing the indices of all required fields in
-/// `input_schema` to calculate `expr` successfully.
-fn indices_referred_by_expr(
-    input_schema: &DFSchemaRef,
-    expr: &Expr,
-) -> Result<Vec<usize>> {
-    let mut cols = expr.to_columns()?;
-    // Get outer-referenced (subquery) columns:
-    outer_columns(expr, &mut cols);
-    Ok(cols
-        .iter()
-        .flat_map(|col| input_schema.index_of_column(col))
-        .collect())
-}
-
-/// Gets all required indices for the input; i.e. those required by the parent
-/// and those referred to by `exprs`.
-///
-/// # Parameters
-///
-/// * `parent_required_indices` - A slice of indices required by the parent 
plan.
-/// * `input` - The input logical plan to analyze for index requirements.
-/// * `exprs` - An iterator of expressions used to determine required indices.
-///
-/// # Returns
-///
-/// A `Result` containing a vector of `usize` indices containing all the 
required
-/// indices.
-fn get_all_required_indices<'a>(
-    parent_required_indices: &[usize],
-    input: &LogicalPlan,
-    exprs: impl Iterator<Item = &'a Expr>,
-) -> Result<Vec<usize>> {
-    indices_referred_by_exprs(input.schema(), exprs)
-        .map(|indices| merge_slices(parent_required_indices, &indices))
-}
-
-/// Retrieves the expressions at specified indices within the given slice. 
Ignores
-/// any invalid indices.
-///
-/// # Parameters
-///
-/// * `exprs` - A slice of expressions to index into.
-/// * `indices` - A slice of indices specifying the positions of expressions 
sought.
-///
-/// # Returns
-///
-/// A vector of expressions corresponding to specified indices.
-fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec<Expr> {
-    indices
-        .iter()
-        // Indices may point to further places than `exprs` len.
-        .filter_map(|&idx| exprs.get(idx).cloned())
-        .collect()
-}
-
-/// Merges two slices into a single vector with sorted (ascending) and
-/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]`
-/// will produce `[1, 2, 3, 6]`.
-fn merge_slices<T: Clone + Ord>(left: &[T], right: &[T]) -> Vec<T> {
-    // Make sure to sort before deduping, which removes the duplicates:
-    left.iter()
-        .cloned()
-        .chain(right.iter().cloned())
-        .sorted()
-        .dedup()
-        .collect()
-}
-
 /// Splits requirement indices for a join into left and right children based on
 /// the join type.
 ///
@@ -810,26 +648,21 @@ fn merge_slices<T: Clone + Ord>(left: &[T], right: &[T]) 
-> Vec<T> {
 /// adjusted based on the join type.
 fn split_join_requirements(
     left_len: usize,
-    indices: &[usize],
+    indices: RequiredIndicies,
     join_type: &JoinType,
-) -> (Vec<usize>, Vec<usize>) {
+) -> (RequiredIndicies, RequiredIndicies) {
     match join_type {
         // In these cases requirements are split between left/right children:
         JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
-            let (left_reqs, mut right_reqs): (Vec<usize>, Vec<usize>) =
-                indices.iter().partition(|&&idx| idx < left_len);
             // Decrease right side indices by `left_len` so that they point to 
valid
             // positions within the right child:
-            for idx in right_reqs.iter_mut() {
-                *idx -= left_len;
-            }
-            (left_reqs, right_reqs)
+            indices.split_off(left_len)
         }
         // All requirements can be re-routed to left child directly.
-        JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]),
+        JoinType::LeftAnti | JoinType::LeftSemi => (indices, 
RequiredIndicies::new()),
         // All requirements can be re-routed to right side directly.
         // No need to change index, join schema is right child schema.
-        JoinType::RightSemi | JoinType::RightAnti => (vec![], 
indices.to_vec()),
+        JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(), 
indices),
     }
 }
 
@@ -885,13 +718,14 @@ fn add_projection_on_top_if_helpful(
 fn rewrite_projection_given_requirements(
     proj: &Projection,
     config: &dyn OptimizerConfig,
-    indices: &[usize],
+    indices: RequiredIndicies,
 ) -> Result<Option<LogicalPlan>> {
-    let exprs_used = get_at_indices(&proj.expr, indices);
+    let exprs_used = indices.get_at_indices(&proj.expr);
+
     let required_indices =
-        indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?;
+        RequiredIndicies::new().with_exprs(proj.input.schema(), 
exprs_used.iter())?;
     return if let Some(input) =
-        optimize_projections(&proj.input, config, &required_indices)?
+        optimize_projections(&proj.input, config, required_indices)?
     {
         if is_projection_unnecessary(&input, &exprs_used)? {
             Ok(Some(input))
diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs 
b/datafusion/optimizer/src/optimize_projections/required_indices.rs
new file mode 100644
index 0000000000..113c100bbd
--- /dev/null
+++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`RequiredIndicies`] helper for OptimizeProjection
+
+use crate::optimize_projections::outer_columns;
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Column, DFSchemaRef, Result};
+use datafusion_expr::{Expr, LogicalPlan};
+
+/// Represents columns in a schema which are required (used) by a plan node
+///
+/// Also carries a flag indicating if putting a projection above children is
+/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from
+/// small tables. Hence for filter child this flag would be `true`. Defaults to
+/// `false`
+///
+/// # Invariant
+///
+/// Indices are always in order and without duplicates. For example, if these
+/// indices were added `[3, 2, 4, 3, 6, 1]`,  the instance would be represented
+/// by  `[1, 2, 3, 6]`.
+#[derive(Debug, Clone, Default)]
+pub(super) struct RequiredIndicies {
+    /// The indices of the required columns in the
+    indices: Vec<usize>,
+    /// If putting a projection above children is beneficial for the parent.
+    /// Defaults to false.
+    projection_beneficial: bool,
+}
+
+impl RequiredIndicies {
+    /// Create a new, empty instance
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Create a new instance that requires all columns from the specified plan
+    pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self {
+        Self {
+            indices: (0..plan.schema().fields().len()).collect(),
+            projection_beneficial: false,
+        }
+    }
+
+    /// Create a new instance with the specified indices as required
+    pub fn new_from_indices(indices: Vec<usize>) -> Self {
+        Self {
+            indices,
+            projection_beneficial: false,
+        }
+        .compact()
+    }
+
+    /// Convert the instance to its inner indices
+    pub fn into_inner(self) -> Vec<usize> {
+        self.indices
+    }
+
+    /// Set the projection beneficial flag
+    pub fn with_projection_beneficial(mut self) -> Self {
+        self.projection_beneficial = true;
+        self
+    }
+
+    /// Return the value of projection beneficial flag
+    pub fn projection_beneficial(&self) -> bool {
+        self.projection_beneficial
+    }
+
+    /// Return a reference to the underlying indices
+    pub fn indices(&self) -> &[usize] {
+        &self.indices
+    }
+
+    /// Add required indices for all `exprs` used in plan
+    pub fn with_plan_exprs(
+        mut self,
+        plan: &LogicalPlan,
+        schema: &DFSchemaRef,
+    ) -> Result<Self> {
+        // Add indices of the child fields referred to by the expressions in 
the
+        // parent
+        plan.apply_expressions(|e| {
+            self.add_expr(schema, e)?;
+            Ok(TreeNodeRecursion::Continue)
+        })?;
+        Ok(self.compact())
+    }
+
+    /// Adds the indices of the fields referred to by the given expression
+    /// `expr` within the given schema (`input_schema`).
+    ///
+    /// Self is NOT compacted (and thus this method is not pub)
+    ///
+    /// # Parameters
+    ///
+    /// * `input_schema`: The input schema to analyze for index requirements.
+    /// * `expr`: An expression for which we want to find necessary field 
indices.
+    fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> 
Result<()> {
+        // TODO could remove these clones (and visit the expression directly)
+        let mut cols = expr.to_columns()?;
+        // Get outer-referenced (subquery) columns:
+        outer_columns(expr, &mut cols);
+        self.indices.reserve(cols.len());
+        for col in cols {
+            if let Some(idx) = input_schema.maybe_index_of_column(&col) {
+                self.indices.push(idx);
+            }
+        }
+        Ok(())
+    }
+
+    /// Adds the indices of the fields referred to by the given expressions
+    /// `within the given schema.
+    ///
+    /// # Parameters
+    ///
+    /// * `input_schema`: The input schema to analyze for index requirements.
+    /// * `exprs`: the expressions for which we want to find field indices.
+    pub fn with_exprs<'a>(
+        self,
+        schema: &DFSchemaRef,
+        exprs: impl IntoIterator<Item = &'a Expr>,
+    ) -> Result<Self> {
+        exprs
+            .into_iter()
+            .try_fold(self, |mut acc, expr| {
+                acc.add_expr(schema, expr)?;
+                Ok(acc)
+            })
+            .map(|acc| acc.compact())
+    }
+
+    /// Adds all `indices` into this instance.
+    pub fn append(mut self, indices: &[usize]) -> Self {
+        self.indices.extend_from_slice(indices);
+        self.compact()
+    }
+
+    /// Splits this instance into a tuple with two instances:
+    /// * The first `n` indices
+    /// * The remaining indices, adjusted down by n
+    pub fn split_off(self, n: usize) -> (Self, Self) {
+        let (l, r) = self.partition(|idx| idx < n);
+        (l, r.map_indices(|idx| idx - n))
+    }
+
+    /// Partitions the indicies in this instance into two groups based on the
+    /// given predicate function `f`.
+    fn partition<F>(&self, f: F) -> (Self, Self)
+    where
+        F: Fn(usize) -> bool,
+    {
+        let (l, r): (Vec<usize>, Vec<usize>) =
+            self.indices.iter().partition(|&&idx| f(idx));
+        let projection_beneficial = self.projection_beneficial;
+
+        (
+            Self {
+                indices: l,
+                projection_beneficial,
+            },
+            Self {
+                indices: r,
+                projection_beneficial,
+            },
+        )
+    }
+
+    /// Map the indices in this instance to a new set of indices based on the
+    /// given function `f`, returning the mapped indices
+    ///
+    /// Not `pub` as it might not preserve the invariant of compacted indices
+    fn map_indices<F>(mut self, f: F) -> Self
+    where
+        F: Fn(usize) -> usize,
+    {
+        self.indices.iter_mut().for_each(|idx| *idx = f(*idx));
+        self
+    }
+
+    /// Apply the given function `f` to each index in this instance, returning
+    /// the mapped indices
+    pub fn into_mapped_indices<F>(self, f: F) -> Vec<usize>
+    where
+        F: Fn(usize) -> usize,
+    {
+        self.map_indices(f).into_inner()
+    }
+
+    /// Returns the `Expr`s from `exprs` that are at the indices in this 
instance
+    pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec<Expr> {
+        self.indices.iter().map(|&idx| exprs[idx].clone()).collect()
+    }
+
+    /// Generates the required expressions (columns) that reside at `indices` 
of
+    /// the given `input_schema`.
+    pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec<Expr> {
+        self.indices
+            .iter()
+            .map(|&idx| 
Expr::from(Column::from(input_schema.qualified_field(idx))))
+            .collect()
+    }
+
+    /// Compacts the indices of this instance so they are sorted
+    /// (ascending) and deduplicated.
+    fn compact(mut self) -> Self {
+        self.indices.sort_unstable();
+        self.indices.dedup();
+        self
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to