This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5c86db0b2c Avoid some copies, encapsulate the handling of child
indicies in `OptimizeProjection` (#10216)
5c86db0b2c is described below
commit 5c86db0b2c5a459d54994b7c6ce6b461cc35d767
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Apr 25 15:24:09 2024 -0400
Avoid some copies, encapsulate the handling of child indicies in
`OptimizeProjection` (#10216)
---
datafusion/common/src/dfschema.rs | 17 +-
.../mod.rs} | 304 +++++----------------
.../src/optimize_projections/required_indices.rs | 227 +++++++++++++++
3 files changed, 311 insertions(+), 237 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index f1909f0dc8..64e40ea99e 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -347,9 +347,22 @@ impl DFSchema {
matches.next()
}
- /// Find the index of the column with the given qualifier and name
- pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+ /// Find the index of the column with the given qualifier and name,
+ /// returning `None` if not found
+ ///
+ /// See [Self::index_of_column] for a version that returns an error if the
+ /// column is not found
+ pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
+ }
+
+ /// Find the index of the column with the given qualifier and name,
+ /// returning `Err` if not found
+ ///
+ /// See [Self::maybe_index_of_column] for a version that returns `None` if
+ /// the column is not found
+ pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+ self.maybe_index_of_column(col)
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name,
self))
}
diff --git a/datafusion/optimizer/src/optimize_projections.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
similarity index 87%
rename from datafusion/optimizer/src/optimize_projections.rs
rename to datafusion/optimizer/src/optimize_projections/mod.rs
index 70ffd8f244..0f2aaa6cbc 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -17,16 +17,16 @@
//! [`OptimizeProjections`] identifies and eliminates unused columns
+mod required_indices;
+
use std::collections::HashSet;
use std::sync::Arc;
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
-use arrow::datatypes::SchemaRef;
use datafusion_common::{
- get_required_group_by_exprs_indices, internal_err, Column, DFSchema,
DFSchemaRef,
- JoinType, Result,
+ get_required_group_by_exprs_indices, internal_err, Column, JoinType,
Result,
};
use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::{
@@ -34,9 +34,10 @@ use datafusion_expr::{
Expr, Projection, TableScan, Window,
};
+use crate::optimize_projections::required_indices::RequiredIndicies;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use hashbrown::HashMap;
-use itertools::{izip, Itertools};
+use itertools::izip;
/// Optimizer rule to prune unnecessary columns from intermediate schemas
/// inside the [`LogicalPlan`]. This rule:
@@ -70,8 +71,8 @@ impl OptimizerRule for OptimizeProjections {
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// All output fields are necessary:
- let indices = (0..plan.schema().fields().len()).collect::<Vec<_>>();
- optimize_projections(plan, config, &indices)
+ let indices = RequiredIndicies::new_for_all_exprs(plan);
+ optimize_projections(plan, config, indices)
}
fn name(&self) -> &str {
@@ -105,13 +106,9 @@ impl OptimizerRule for OptimizeProjections {
fn optimize_projections(
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
- indices: &[usize],
+ indices: RequiredIndicies,
) -> Result<Option<LogicalPlan>> {
- // `child_required_indices` stores
- // - indices of the columns required for each child
- // - a flag indicating whether putting a projection above children is
beneficial for the parent.
- // As an example LogicalPlan::Filter benefits from small tables. Hence for
filter child this flag would be `true`.
- let child_required_indices: Vec<(Vec<usize>, bool)> = match plan {
+ let child_required_indices: Vec<RequiredIndicies> = match plan {
LogicalPlan::Sort(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Repartition(_)
@@ -123,12 +120,13 @@ fn optimize_projections(
// that appear in this plan's expressions to its child. All these
// operators benefit from "small" inputs, so the
projection_beneficial
// flag is `true`.
- let exprs = plan.expressions();
plan.inputs()
.into_iter()
.map(|input| {
- get_all_required_indices(indices, input, exprs.iter())
- .map(|idxs| (idxs, true))
+ indices
+ .clone()
+ .with_projection_beneficial()
+ .with_plan_exprs(plan, input.schema())
})
.collect::<Result<_>>()?
}
@@ -137,13 +135,9 @@ fn optimize_projections(
// that appear in this plan's expressions to its child. These
operators
// do not benefit from "small" inputs, so the projection_beneficial
// flag is `false`.
- let exprs = plan.expressions();
plan.inputs()
.into_iter()
- .map(|input| {
- get_all_required_indices(indices, input, exprs.iter())
- .map(|idxs| (idxs, false))
- })
+ .map(|input| indices.clone().with_plan_exprs(plan,
input.schema()))
.collect::<Result<_>>()?
}
LogicalPlan::Copy(_)
@@ -159,16 +153,14 @@ fn optimize_projections(
// TODO: For some subquery variants (e.g. a subquery arising from
an
// EXISTS expression), we may not need to require all
indices.
plan.inputs()
- .iter()
- .map(|input|
((0..input.schema().fields().len()).collect_vec(), false))
- .collect::<Vec<_>>()
+ .into_iter()
+ .map(RequiredIndicies::new_for_all_exprs)
+ .collect()
}
LogicalPlan::Extension(extension) => {
- let necessary_children_indices = if let
Some(necessary_children_indices) =
- extension.node.necessary_children_exprs(indices)
- {
- necessary_children_indices
- } else {
+ let Some(necessary_children_indices) =
+ extension.node.necessary_children_exprs(indices.indices())
+ else {
// Requirements from parent cannot be routed down to user
defined logical plan safely
return Ok(None);
};
@@ -178,16 +170,12 @@ fn optimize_projections(
Make sure `.necessary_children_exprs` implementation of the
`UserDefinedLogicalNode` is \
consistent with actual children length for the node.");
}
- // Expressions used by node.
- let exprs = plan.expressions();
children
.into_iter()
.zip(necessary_children_indices)
.map(|(child, necessary_indices)| {
- let child_schema = child.schema();
- let child_req_indices =
- indices_referred_by_exprs(child_schema, exprs.iter())?;
- Ok((merge_slices(&necessary_indices, &child_req_indices),
false))
+ RequiredIndicies::new_from_indices(necessary_indices)
+ .with_plan_exprs(plan, child.schema())
})
.collect::<Result<Vec<_>>>()?
}
@@ -213,13 +201,9 @@ fn optimize_projections(
LogicalPlan::Aggregate(aggregate) => {
// Split parent requirements to GROUP BY and aggregate sections:
let n_group_exprs = aggregate.group_expr_len()?;
- let (group_by_reqs, mut aggregate_reqs): (Vec<usize>, Vec<usize>) =
- indices.iter().partition(|&&idx| idx < n_group_exprs);
// Offset aggregate indices so that they point to valid indices at
// `aggregate.aggr_expr`:
- for idx in aggregate_reqs.iter_mut() {
- *idx -= n_group_exprs;
- }
+ let (group_by_reqs, aggregate_reqs) =
indices.split_off(n_group_exprs);
// Get absolutely necessary GROUP BY fields:
let group_by_expr_existing = aggregate
@@ -235,16 +219,16 @@ fn optimize_projections(
// Some of the fields in the GROUP BY may be required by the
// parent even if these fields are unnecessary in terms of
// functional dependency.
- let required_indices =
- merge_slices(&simplest_groupby_indices, &group_by_reqs);
- get_at_indices(&aggregate.group_expr, &required_indices)
+ group_by_reqs
+ .append(&simplest_groupby_indices)
+ .get_at_indices(&aggregate.group_expr)
} else {
aggregate.group_expr.clone()
};
// Only use the absolutely necessary aggregate expressions required
// by the parent:
- let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr,
&aggregate_reqs);
+ let mut new_aggr_expr =
aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
// Aggregations always need at least one aggregate expression.
// With a nested count, we don't require any column as input, but
@@ -263,10 +247,12 @@ fn optimize_projections(
let all_exprs_iter =
new_group_bys.iter().chain(new_aggr_expr.iter());
let schema = aggregate.input.schema();
- let necessary_indices = indices_referred_by_exprs(schema,
all_exprs_iter)?;
+ let necessary_indices =
+ RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
+ let necessary_exprs = necessary_indices.get_required_exprs(schema);
let aggregate_input = if let Some(input) =
- optimize_projections(&aggregate.input, config,
&necessary_indices)?
+ optimize_projections(&aggregate.input, config,
necessary_indices)?
{
input
} else {
@@ -277,7 +263,6 @@ fn optimize_projections(
// that its input only contains absolutely necessary columns for
// the aggregate expressions. Note that necessary_indices refer to
// fields in `aggregate.input.schema()`.
- let necessary_exprs = get_required_exprs(schema,
&necessary_indices);
let (aggregate_input, _) =
add_projection_on_top_if_helpful(aggregate_input,
necessary_exprs)?;
@@ -291,29 +276,24 @@ fn optimize_projections(
.map(|aggregate| Some(LogicalPlan::Aggregate(aggregate)));
}
LogicalPlan::Window(window) => {
+ let input_schema = window.input.schema();
// Split parent requirements to child and window expression
sections:
- let n_input_fields = window.input.schema().fields().len();
- let (child_reqs, mut window_reqs): (Vec<usize>, Vec<usize>) =
- indices.iter().partition(|&&idx| idx < n_input_fields);
+ let n_input_fields = input_schema.fields().len();
// Offset window expression indices so that they point to valid
// indices at `window.window_expr`:
- for idx in window_reqs.iter_mut() {
- *idx -= n_input_fields;
- }
+ let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
// Only use window expressions that are absolutely necessary
according
// to parent requirements:
- let new_window_expr = get_at_indices(&window.window_expr,
&window_reqs);
+ let new_window_expr =
window_reqs.get_at_indices(&window.window_expr);
// Get all the required column indices at the input, either by the
// parent or window expression requirements.
- let required_indices = get_all_required_indices(
- &child_reqs,
- &window.input,
- new_window_expr.iter(),
- )?;
+ let required_indices =
+ child_reqs.with_exprs(input_schema, &new_window_expr)?;
+
let window_child = if let Some(new_window_child) =
- optimize_projections(&window.input, config, &required_indices)?
+ optimize_projections(&window.input, config,
required_indices.clone())?
{
new_window_child
} else {
@@ -327,8 +307,7 @@ fn optimize_projections(
// Calculate required expressions at the input of the window.
// Please note that we use `old_child`, because
`required_indices`
// refers to `old_child`.
- let required_exprs =
- get_required_exprs(window.input.schema(),
&required_indices);
+ let required_exprs =
required_indices.get_required_exprs(input_schema);
let (window_child, _) =
add_projection_on_top_if_helpful(window_child,
required_exprs)?;
Window::try_new(new_window_expr, Arc::new(window_child))
@@ -339,31 +318,35 @@ fn optimize_projections(
let left_len = join.left.schema().fields().len();
let (left_req_indices, right_req_indices) =
split_join_requirements(left_len, indices, &join.join_type);
- let exprs = plan.expressions();
let left_indices =
- get_all_required_indices(&left_req_indices, &join.left,
exprs.iter())?;
+ left_req_indices.with_plan_exprs(plan, join.left.schema())?;
let right_indices =
- get_all_required_indices(&right_req_indices, &join.right,
exprs.iter())?;
+ right_req_indices.with_plan_exprs(plan, join.right.schema())?;
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
- vec![(left_indices, true), (right_indices, true)]
+ vec![
+ left_indices.with_projection_beneficial(),
+ right_indices.with_projection_beneficial(),
+ ]
}
LogicalPlan::CrossJoin(cross_join) => {
let left_len = cross_join.left.schema().fields().len();
- let (left_child_indices, right_child_indices) =
+ let (left_indices, right_indices) =
split_join_requirements(left_len, indices, &JoinType::Inner);
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
- vec![(left_child_indices, true), (right_child_indices, true)]
+ vec![
+ left_indices.with_projection_beneficial(),
+ right_indices.with_projection_beneficial(),
+ ]
}
LogicalPlan::TableScan(table_scan) => {
- let schema = table_scan.source.schema();
// Get indices referred to in the original (schema with all fields)
// given projected indices.
- let projection = with_indices(&table_scan.projection, schema,
|map| {
- indices.iter().map(|&idx| map[idx]).collect()
- });
-
+ let projection = match &table_scan.projection {
+ Some(projection) => indices.into_mapped_indices(|idx|
projection[idx]),
+ None => indices.into_inner(),
+ };
return TableScan::try_new(
table_scan.table_name.clone(),
table_scan.source.clone(),
@@ -376,15 +359,16 @@ fn optimize_projections(
};
let new_inputs = izip!(child_required_indices, plan.inputs().into_iter())
- .map(|((required_indices, projection_beneficial), child)| {
+ .map(|(required_indices, child)| {
+ let projection_beneficial =
required_indices.projection_beneficial();
+ let project_exprs =
required_indices.get_required_exprs(child.schema());
let (input, is_changed) = if let Some(new_input) =
- optimize_projections(child, config, &required_indices)?
+ optimize_projections(child, config, required_indices)?
{
(new_input, true)
} else {
(child.clone(), false)
};
- let project_exprs = get_required_exprs(child.schema(),
&required_indices);
let (input, proj_added) = if projection_beneficial {
add_projection_on_top_if_helpful(input, project_exprs)?
} else {
@@ -408,26 +392,6 @@ fn optimize_projections(
}
}
-/// This function applies the given function `f` to the projection indices
-/// `proj_indices` if they exist. Otherwise, applies `f` to a default set
-/// of indices according to `schema`.
-fn with_indices<F>(
- proj_indices: &Option<Vec<usize>>,
- schema: SchemaRef,
- mut f: F,
-) -> Vec<usize>
-where
- F: FnMut(&[usize]) -> Vec<usize>,
-{
- match proj_indices {
- Some(indices) => f(indices.as_slice()),
- None => {
- let range: Vec<usize> = (0..schema.fields.len()).collect();
- f(range.as_slice())
- }
- }
-}
-
/// Merges consecutive projections.
///
/// Given a projection `proj`, this function attempts to merge it with a
previous
@@ -653,132 +617,6 @@ fn outer_columns_helper_multi<'a>(
exprs.into_iter().for_each(|e| outer_columns(e, columns));
}
-/// Generates the required expressions (columns) that reside at `indices` of
-/// the given `input_schema`.
-///
-/// # Arguments
-///
-/// * `input_schema` - A reference to the input schema.
-/// * `indices` - A slice of `usize` indices specifying required columns.
-///
-/// # Returns
-///
-/// A vector of `Expr::Column` expressions residing at `indices` of the
`input_schema`.
-fn get_required_exprs(input_schema: &Arc<DFSchema>, indices: &[usize]) ->
Vec<Expr> {
- indices
- .iter()
- .map(|&idx|
Expr::Column(Column::from(input_schema.qualified_field(idx))))
- .collect()
-}
-
-/// Get indices of the fields referred to by any expression in `exprs` within
-/// the given schema (`input_schema`).
-///
-/// # Arguments
-///
-/// * `input_schema`: The input schema to analyze for index requirements.
-/// * `exprs`: An iterator of expressions for which we want to find necessary
-/// field indices.
-///
-/// # Returns
-///
-/// A [`Result`] object containing the indices of all required fields in
-/// `input_schema` to calculate all `exprs` successfully.
-fn indices_referred_by_exprs<'a>(
- input_schema: &DFSchemaRef,
- exprs: impl Iterator<Item = &'a Expr>,
-) -> Result<Vec<usize>> {
- let indices = exprs
- .map(|expr| indices_referred_by_expr(input_schema, expr))
- .collect::<Result<Vec<_>>>()?;
- Ok(indices
- .into_iter()
- .flatten()
- // Make sure no duplicate entries exist and indices are ordered:
- .sorted()
- .dedup()
- .collect())
-}
-
-/// Get indices of the fields referred to by the given expression `expr` within
-/// the given schema (`input_schema`).
-///
-/// # Parameters
-///
-/// * `input_schema`: The input schema to analyze for index requirements.
-/// * `expr`: An expression for which we want to find necessary field indices.
-///
-/// # Returns
-///
-/// A [`Result`] object containing the indices of all required fields in
-/// `input_schema` to calculate `expr` successfully.
-fn indices_referred_by_expr(
- input_schema: &DFSchemaRef,
- expr: &Expr,
-) -> Result<Vec<usize>> {
- let mut cols = expr.to_columns()?;
- // Get outer-referenced (subquery) columns:
- outer_columns(expr, &mut cols);
- Ok(cols
- .iter()
- .flat_map(|col| input_schema.index_of_column(col))
- .collect())
-}
-
-/// Gets all required indices for the input; i.e. those required by the parent
-/// and those referred to by `exprs`.
-///
-/// # Parameters
-///
-/// * `parent_required_indices` - A slice of indices required by the parent
plan.
-/// * `input` - The input logical plan to analyze for index requirements.
-/// * `exprs` - An iterator of expressions used to determine required indices.
-///
-/// # Returns
-///
-/// A `Result` containing a vector of `usize` indices containing all the
required
-/// indices.
-fn get_all_required_indices<'a>(
- parent_required_indices: &[usize],
- input: &LogicalPlan,
- exprs: impl Iterator<Item = &'a Expr>,
-) -> Result<Vec<usize>> {
- indices_referred_by_exprs(input.schema(), exprs)
- .map(|indices| merge_slices(parent_required_indices, &indices))
-}
-
-/// Retrieves the expressions at specified indices within the given slice.
Ignores
-/// any invalid indices.
-///
-/// # Parameters
-///
-/// * `exprs` - A slice of expressions to index into.
-/// * `indices` - A slice of indices specifying the positions of expressions
sought.
-///
-/// # Returns
-///
-/// A vector of expressions corresponding to specified indices.
-fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec<Expr> {
- indices
- .iter()
- // Indices may point to further places than `exprs` len.
- .filter_map(|&idx| exprs.get(idx).cloned())
- .collect()
-}
-
-/// Merges two slices into a single vector with sorted (ascending) and
-/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]`
-/// will produce `[1, 2, 3, 6]`.
-fn merge_slices<T: Clone + Ord>(left: &[T], right: &[T]) -> Vec<T> {
- // Make sure to sort before deduping, which removes the duplicates:
- left.iter()
- .cloned()
- .chain(right.iter().cloned())
- .sorted()
- .dedup()
- .collect()
-}
-
/// Splits requirement indices for a join into left and right children based on
/// the join type.
///
@@ -810,26 +648,21 @@ fn merge_slices<T: Clone + Ord>(left: &[T], right: &[T])
-> Vec<T> {
/// adjusted based on the join type.
fn split_join_requirements(
left_len: usize,
- indices: &[usize],
+ indices: RequiredIndicies,
join_type: &JoinType,
-) -> (Vec<usize>, Vec<usize>) {
+) -> (RequiredIndicies, RequiredIndicies) {
match join_type {
// In these cases requirements are split between left/right children:
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full =>
{
- let (left_reqs, mut right_reqs): (Vec<usize>, Vec<usize>) =
- indices.iter().partition(|&&idx| idx < left_len);
// Decrease right side indices by `left_len` so that they point to
valid
// positions within the right child:
- for idx in right_reqs.iter_mut() {
- *idx -= left_len;
- }
- (left_reqs, right_reqs)
+ indices.split_off(left_len)
}
// All requirements can be re-routed to left child directly.
- JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]),
+ JoinType::LeftAnti | JoinType::LeftSemi => (indices,
RequiredIndicies::new()),
// All requirements can be re-routed to right side directly.
// No need to change index, join schema is right child schema.
- JoinType::RightSemi | JoinType::RightAnti => (vec![],
indices.to_vec()),
+ JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(),
indices),
}
}
@@ -885,13 +718,14 @@ fn add_projection_on_top_if_helpful(
fn rewrite_projection_given_requirements(
proj: &Projection,
config: &dyn OptimizerConfig,
- indices: &[usize],
+ indices: RequiredIndicies,
) -> Result<Option<LogicalPlan>> {
- let exprs_used = get_at_indices(&proj.expr, indices);
+ let exprs_used = indices.get_at_indices(&proj.expr);
+
let required_indices =
- indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?;
+ RequiredIndicies::new().with_exprs(proj.input.schema(),
exprs_used.iter())?;
return if let Some(input) =
- optimize_projections(&proj.input, config, &required_indices)?
+ optimize_projections(&proj.input, config, required_indices)?
{
if is_projection_unnecessary(&input, &exprs_used)? {
Ok(Some(input))
diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs
b/datafusion/optimizer/src/optimize_projections/required_indices.rs
new file mode 100644
index 0000000000..113c100bbd
--- /dev/null
+++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`RequiredIndicies`] helper for OptimizeProjection
+
+use crate::optimize_projections::outer_columns;
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Column, DFSchemaRef, Result};
+use datafusion_expr::{Expr, LogicalPlan};
+
+/// Represents columns in a schema which are required (used) by a plan node
+///
+/// Also carries a flag indicating if putting a projection above children is
+/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from
+/// small tables. Hence for filter child this flag would be `true`. Defaults to
+/// `false`
+///
+/// # Invariant
+///
+/// Indices are always in order and without duplicates. For example, if these
+/// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented
+/// by `[1, 2, 3, 6]`.
+#[derive(Debug, Clone, Default)]
+pub(super) struct RequiredIndicies {
+ /// The indices of the required columns in the
+ indices: Vec<usize>,
+ /// If putting a projection above children is beneficial for the parent.
+ /// Defaults to false.
+ projection_beneficial: bool,
+}
+
+impl RequiredIndicies {
+ /// Create a new, empty instance
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Create a new instance that requires all columns from the specified plan
+ pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self {
+ Self {
+ indices: (0..plan.schema().fields().len()).collect(),
+ projection_beneficial: false,
+ }
+ }
+
+ /// Create a new instance with the specified indices as required
+ pub fn new_from_indices(indices: Vec<usize>) -> Self {
+ Self {
+ indices,
+ projection_beneficial: false,
+ }
+ .compact()
+ }
+
+ /// Convert the instance to its inner indices
+ pub fn into_inner(self) -> Vec<usize> {
+ self.indices
+ }
+
+ /// Set the projection beneficial flag
+ pub fn with_projection_beneficial(mut self) -> Self {
+ self.projection_beneficial = true;
+ self
+ }
+
+ /// Return the value of projection beneficial flag
+ pub fn projection_beneficial(&self) -> bool {
+ self.projection_beneficial
+ }
+
+ /// Return a reference to the underlying indices
+ pub fn indices(&self) -> &[usize] {
+ &self.indices
+ }
+
+ /// Add required indices for all `exprs` used in plan
+ pub fn with_plan_exprs(
+ mut self,
+ plan: &LogicalPlan,
+ schema: &DFSchemaRef,
+ ) -> Result<Self> {
+ // Add indices of the child fields referred to by the expressions in
the
+ // parent
+ plan.apply_expressions(|e| {
+ self.add_expr(schema, e)?;
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ Ok(self.compact())
+ }
+
+ /// Adds the indices of the fields referred to by the given expression
+ /// `expr` within the given schema (`input_schema`).
+ ///
+ /// Self is NOT compacted (and thus this method is not pub)
+ ///
+ /// # Parameters
+ ///
+ /// * `input_schema`: The input schema to analyze for index requirements.
+ /// * `expr`: An expression for which we want to find necessary field
indices.
+ fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) ->
Result<()> {
+ // TODO could remove these clones (and visit the expression directly)
+ let mut cols = expr.to_columns()?;
+ // Get outer-referenced (subquery) columns:
+ outer_columns(expr, &mut cols);
+ self.indices.reserve(cols.len());
+ for col in cols {
+ if let Some(idx) = input_schema.maybe_index_of_column(&col) {
+ self.indices.push(idx);
+ }
+ }
+ Ok(())
+ }
+
+ /// Adds the indices of the fields referred to by the given expressions
+ /// `within the given schema.
+ ///
+ /// # Parameters
+ ///
+ /// * `input_schema`: The input schema to analyze for index requirements.
+ /// * `exprs`: the expressions for which we want to find field indices.
+ pub fn with_exprs<'a>(
+ self,
+ schema: &DFSchemaRef,
+ exprs: impl IntoIterator<Item = &'a Expr>,
+ ) -> Result<Self> {
+ exprs
+ .into_iter()
+ .try_fold(self, |mut acc, expr| {
+ acc.add_expr(schema, expr)?;
+ Ok(acc)
+ })
+ .map(|acc| acc.compact())
+ }
+
+ /// Adds all `indices` into this instance.
+ pub fn append(mut self, indices: &[usize]) -> Self {
+ self.indices.extend_from_slice(indices);
+ self.compact()
+ }
+
+ /// Splits this instance into a tuple with two instances:
+ /// * The first `n` indices
+ /// * The remaining indices, adjusted down by n
+ pub fn split_off(self, n: usize) -> (Self, Self) {
+ let (l, r) = self.partition(|idx| idx < n);
+ (l, r.map_indices(|idx| idx - n))
+ }
+
+ /// Partitions the indicies in this instance into two groups based on the
+ /// given predicate function `f`.
+ fn partition<F>(&self, f: F) -> (Self, Self)
+ where
+ F: Fn(usize) -> bool,
+ {
+ let (l, r): (Vec<usize>, Vec<usize>) =
+ self.indices.iter().partition(|&&idx| f(idx));
+ let projection_beneficial = self.projection_beneficial;
+
+ (
+ Self {
+ indices: l,
+ projection_beneficial,
+ },
+ Self {
+ indices: r,
+ projection_beneficial,
+ },
+ )
+ }
+
+ /// Map the indices in this instance to a new set of indices based on the
+ /// given function `f`, returning the mapped indices
+ ///
+ /// Not `pub` as it might not preserve the invariant of compacted indices
+ fn map_indices<F>(mut self, f: F) -> Self
+ where
+ F: Fn(usize) -> usize,
+ {
+ self.indices.iter_mut().for_each(|idx| *idx = f(*idx));
+ self
+ }
+
+ /// Apply the given function `f` to each index in this instance, returning
+ /// the mapped indices
+ pub fn into_mapped_indices<F>(self, f: F) -> Vec<usize>
+ where
+ F: Fn(usize) -> usize,
+ {
+ self.map_indices(f).into_inner()
+ }
+
+ /// Returns the `Expr`s from `exprs` that are at the indices in this
instance
+ pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec<Expr> {
+ self.indices.iter().map(|&idx| exprs[idx].clone()).collect()
+ }
+
+ /// Generates the required expressions (columns) that reside at `indices`
of
+ /// the given `input_schema`.
+ pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec<Expr> {
+ self.indices
+ .iter()
+ .map(|&idx|
Expr::from(Column::from(input_schema.qualified_field(idx))))
+ .collect()
+ }
+
+ /// Compacts the indices of this instance so they are sorted
+ /// (ascending) and deduplicated.
+ fn compact(mut self) -> Self {
+ self.indices.sort_unstable();
+ self.indices.dedup();
+ self
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]