This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cf0cba7220 Stop copying LogicalPlan and Exprs in `OptimizeProjections`
(2% faster planning) (#10405)
cf0cba7220 is described below
commit cf0cba72204e54c8eb05156d372dc5bab248e4ea
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri May 10 11:07:17 2024 -0400
Stop copying LogicalPlan and Exprs in `OptimizeProjections` (2% faster
planning) (#10405)
* Add `LogicalPlan::recompute_schema` for handling rewrite passes
* Stop copying LogicalPlan and Exprs in `OptimizeProjections`
---
datafusion/expr/src/expr.rs | 28 +
datafusion/expr/src/logical_plan/plan.rs | 195 ++++++
.../optimizer/src/optimize_projections/mod.rs | 720 +++++++++++----------
datafusion/optimizer/src/utils.rs | 7 +
datafusion/sqllogictest/test_files/cte.slt | 60 +-
datafusion/sqllogictest/test_files/joins.slt | 42 +-
6 files changed, 666 insertions(+), 386 deletions(-)
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 01ef2571ea..c531d7af17 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -1079,6 +1079,34 @@ impl Expr {
}
}
+ /// Recursively potentially multiple aliases from an expression.
+ ///
+ /// If the expression is not an alias, the expression is returned
unchanged.
+ /// This method removes directly nested aliases, but not other nested
+ /// aliases.
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion_expr::col;
+ /// // `foo as "bar"` is unaliased to `foo`
+ /// let expr = col("foo").alias("bar");
+ /// assert_eq!(expr.unalias_nested(), col("foo"));
+ ///
+ /// // `foo as "bar" + baz` is not unaliased
+ /// let expr = col("foo").alias("bar") + col("baz");
+ /// assert_eq!(expr.clone().unalias_nested(), expr);
+ ///
+ /// // `foo as "bar" as "baz" is unalaised to foo
+ /// let expr = col("foo").alias("bar").alias("baz");
+ /// assert_eq!(expr.unalias_nested(), col("foo"));
+ /// ```
+ pub fn unalias_nested(self) -> Expr {
+ match self {
+ Expr::Alias(alias) => alias.expr.unalias_nested(),
+ _ => self,
+ }
+ }
+
/// Return `self IN <list>` if `negated` is false, otherwise
/// return `self NOT IN <list>`.a
pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 1b54e76a17..7dca12f793 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -52,6 +52,7 @@ use datafusion_common::{
// backwards compatibility
use crate::display::PgJsonVisitor;
+use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
@@ -467,6 +468,200 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}
+ /// Recomputes schema and type information for this LogicalPlan if needed.
+ ///
+ /// Some `LogicalPlan`s may need to recompute their schema if the number or
+ /// type of expressions have been changed (for example due to type
+ /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
+ /// its expressions.
+ ///
+ /// Some `LogicalPlan`s schema is unaffected by any changes to their
+ /// expressions. For example [`LogicalPlan::Filter`] schema is always the
+ /// same as its input schema.
+ ///
+ /// # Return value
+ /// Returns an error if there is some issue recomputing the schema.
+ ///
+ /// # Notes
+ ///
+ /// * Does not recursively recompute schema for input (child) plans.
+ pub fn recompute_schema(self) -> Result<Self> {
+ match self {
+ // Since expr may be different than the previous expr, schema of
the projection
+ // may change. We need to use try_new method instead of
try_new_with_schema method.
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema: _,
+ }) => Projection::try_new(expr,
input).map(LogicalPlan::Projection),
+ LogicalPlan::Dml(_) => Ok(self),
+ LogicalPlan::Copy(_) => Ok(self),
+ LogicalPlan::Values(Values { schema, values }) => {
+ // todo it isn't clear why the schema is not recomputed here
+ Ok(LogicalPlan::Values(Values { schema, values }))
+ }
+ LogicalPlan::Filter(Filter { predicate, input }) => {
+ // todo: should this logic be moved to Filter::try_new?
+
+ // filter predicates should not contain aliased expressions so
we remove any aliases
+ // before this logic was added we would have aliases within
filters such as for
+ // benchmark q6:
+ //
+ // lineitem.l_shipdate >= Date32(\"8766\")
+ // AND lineitem.l_shipdate < Date32(\"9131\")
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount >=
+ // Decimal128(Some(49999999999999),30,15)
+ // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS
lineitem.l_discount <=
+ // Decimal128(Some(69999999999999),30,15)
+ // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
+
+ let predicate = predicate
+ .transform_down(|expr| {
+ match expr {
+ Expr::Exists { .. }
+ | Expr::ScalarSubquery(_)
+ | Expr::InSubquery(_) => {
+ // subqueries could contain aliases so we
don't recurse into those
+ Ok(Transformed::new(expr, false,
TreeNodeRecursion::Jump))
+ }
+ Expr::Alias(_) => Ok(Transformed::new(
+ expr.unalias(),
+ true,
+ TreeNodeRecursion::Jump,
+ )),
+ _ => Ok(Transformed::no(expr)),
+ }
+ })
+ .data()?;
+
+ Filter::try_new(predicate, input).map(LogicalPlan::Filter)
+ }
+ LogicalPlan::Repartition(_) => Ok(self),
+ LogicalPlan::Window(Window {
+ input,
+ window_expr,
+ schema: _,
+ }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema: _,
+ }) => Aggregate::try_new(input, group_expr, aggr_expr)
+ .map(LogicalPlan::Aggregate),
+ LogicalPlan::Sort(_) => Ok(self),
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ filter,
+ join_type,
+ join_constraint,
+ on,
+ schema: _,
+ null_equals_null,
+ }) => {
+ let schema =
+ build_join_schema(left.schema(), right.schema(),
&join_type)?;
+
+ let new_on: Vec<_> = on
+ .into_iter()
+ .map(|equi_expr| {
+ // SimplifyExpression rule may add alias to the
equi_expr.
+ (equi_expr.0.unalias(), equi_expr.1.unalias())
+ })
+ .collect();
+
+ Ok(LogicalPlan::Join(Join {
+ left,
+ right,
+ join_type,
+ join_constraint,
+ on: new_on,
+ filter,
+ schema: DFSchemaRef::new(schema),
+ null_equals_null,
+ }))
+ }
+ LogicalPlan::CrossJoin(CrossJoin {
+ left,
+ right,
+ schema: _,
+ }) => {
+ let join_schema =
+ build_join_schema(left.schema(), right.schema(),
&JoinType::Inner)?;
+
+ Ok(LogicalPlan::CrossJoin(CrossJoin {
+ left,
+ right,
+ schema: join_schema.into(),
+ }))
+ }
+ LogicalPlan::Subquery(_) => Ok(self),
+ LogicalPlan::SubqueryAlias(SubqueryAlias {
+ input,
+ alias,
+ schema: _,
+ }) => SubqueryAlias::try_new(input,
alias).map(LogicalPlan::SubqueryAlias),
+ LogicalPlan::Limit(_) => Ok(self),
+ LogicalPlan::Ddl(_) => Ok(self),
+ LogicalPlan::Extension(Extension { node }) => {
+ // todo make an API that does not require cloning
+ // This requires a copy of the extension nodes expressions and
inputs
+ let expr = node.expressions();
+ let inputs: Vec<_> =
node.inputs().into_iter().cloned().collect();
+ Ok(LogicalPlan::Extension(Extension {
+ node: node.from_template(&expr, &inputs),
+ }))
+ }
+ LogicalPlan::Union(Union { inputs, schema }) => {
+ let input_schema = inputs[0].schema();
+ // If inputs are not pruned do not change schema
+ // TODO this seems wrong (shouldn't we always use the schema
of the input?)
+ let schema = if schema.fields().len() ==
input_schema.fields().len() {
+ schema.clone()
+ } else {
+ input_schema.clone()
+ };
+ Ok(LogicalPlan::Union(Union { inputs, schema }))
+ }
+ LogicalPlan::Distinct(distinct) => {
+ let distinct = match distinct {
+ Distinct::All(input) => Distinct::All(input),
+ Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ input,
+ schema: _,
+ }) => Distinct::On(DistinctOn::try_new(
+ on_expr,
+ select_expr,
+ sort_expr,
+ input,
+ )?),
+ };
+ Ok(LogicalPlan::Distinct(distinct))
+ }
+ LogicalPlan::RecursiveQuery(_) => Ok(self),
+ LogicalPlan::Analyze(_) => Ok(self),
+ LogicalPlan::Explain(_) => Ok(self),
+ LogicalPlan::Prepare(_) => Ok(self),
+ LogicalPlan::TableScan(_) => Ok(self),
+ LogicalPlan::EmptyRelation(_) => Ok(self),
+ LogicalPlan::Statement(_) => Ok(self),
+ LogicalPlan::DescribeTable(_) => Ok(self),
+ LogicalPlan::Unnest(Unnest {
+ input,
+ columns,
+ schema: _,
+ options,
+ }) => {
+ // Update schema with unnested column type.
+ unnest_with_options(unwrap_arc(input), columns, options)
+ }
+ }
+ }
+
/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index aa2d005379..5a9705381d 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -26,18 +26,22 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{
- get_required_group_by_exprs_indices, internal_err, Column, JoinType,
Result,
+ get_required_group_by_exprs_indices, internal_datafusion_err,
internal_err, Column,
+ JoinType, Result,
};
-use datafusion_expr::expr::{Alias, ScalarFunction};
+use datafusion_expr::expr::Alias;
use datafusion_expr::{
- logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast,
Distinct,
- Expr, Projection, TableScan, Window,
+ logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr,
Projection,
+ TableScan, Window,
};
use crate::optimize_projections::required_indices::RequiredIndicies;
-use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
+use crate::utils::NamePreserver;
+use datafusion_common::tree_node::{
+ Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
+};
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use hashbrown::HashMap;
-use itertools::izip;
/// Optimizer rule to prune unnecessary columns from intermediate schemas
/// inside the [`LogicalPlan`]. This rule:
@@ -67,12 +71,10 @@ impl OptimizeProjections {
impl OptimizerRule for OptimizeProjections {
fn try_optimize(
&self,
- plan: &LogicalPlan,
- config: &dyn OptimizerConfig,
+ _plan: &LogicalPlan,
+ _config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
- // All output fields are necessary:
- let indices = RequiredIndicies::new_for_all_exprs(plan);
- optimize_projections(plan, config, indices)
+ internal_err!("Should have called OptimizeProjections::rewrite")
}
fn name(&self) -> &str {
@@ -82,6 +84,20 @@ impl OptimizerRule for OptimizeProjections {
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
+
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ // All output fields are necessary:
+ let indices = RequiredIndicies::new_for_all_exprs(&plan);
+ optimize_projections(plan, config, indices)
+ }
}
/// Removes unnecessary columns (e.g. columns that do not appear in the output
@@ -93,7 +109,7 @@ impl OptimizerRule for OptimizeProjections {
/// - `plan`: A reference to the input `LogicalPlan` to optimize.
/// - `config`: A reference to the optimizer configuration.
/// - `indices`: A slice of column indices that represent the necessary column
-/// indices for downstream operations.
+/// indices for downstream (parent) plan nodes.
///
/// # Returns
///
@@ -102,101 +118,19 @@ impl OptimizerRule for OptimizeProjections {
/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary
/// columns.
/// - `Ok(None)`: Signal that the given logical plan did not require any
change.
-/// - `Err(error)`: An error occured during the optimization process.
+/// - `Err(error)`: An error occurred during the optimization process.
fn optimize_projections(
- plan: &LogicalPlan,
+ plan: LogicalPlan,
config: &dyn OptimizerConfig,
indices: RequiredIndicies,
-) -> Result<Option<LogicalPlan>> {
- let child_required_indices: Vec<RequiredIndicies> = match plan {
- LogicalPlan::Sort(_)
- | LogicalPlan::Filter(_)
- | LogicalPlan::Repartition(_)
- | LogicalPlan::Unnest(_)
- | LogicalPlan::Union(_)
- | LogicalPlan::SubqueryAlias(_)
- | LogicalPlan::Distinct(Distinct::On(_)) => {
- // Pass index requirements from the parent as well as column
indices
- // that appear in this plan's expressions to its child. All these
- // operators benefit from "small" inputs, so the
projection_beneficial
- // flag is `true`.
- plan.inputs()
- .into_iter()
- .map(|input| {
- indices
- .clone()
- .with_projection_beneficial()
- .with_plan_exprs(plan, input.schema())
- })
- .collect::<Result<_>>()?
- }
- LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => {
- // Pass index requirements from the parent as well as column
indices
- // that appear in this plan's expressions to its child. These
operators
- // do not benefit from "small" inputs, so the projection_beneficial
- // flag is `false`.
- plan.inputs()
- .into_iter()
- .map(|input| indices.clone().with_plan_exprs(plan,
input.schema()))
- .collect::<Result<_>>()?
- }
- LogicalPlan::Copy(_)
- | LogicalPlan::Ddl(_)
- | LogicalPlan::Dml(_)
- | LogicalPlan::Explain(_)
- | LogicalPlan::Analyze(_)
- | LogicalPlan::Subquery(_)
- | LogicalPlan::Distinct(Distinct::All(_)) => {
- // These plans require all their fields, and their children should
- // be treated as final plans -- otherwise, we may have schema a
- // mismatch.
- // TODO: For some subquery variants (e.g. a subquery arising from
an
- // EXISTS expression), we may not need to require all
indices.
- plan.inputs()
- .into_iter()
- .map(RequiredIndicies::new_for_all_exprs)
- .collect()
- }
- LogicalPlan::Extension(extension) => {
- let Some(necessary_children_indices) =
- extension.node.necessary_children_exprs(indices.indices())
- else {
- // Requirements from parent cannot be routed down to user
defined logical plan safely
- return Ok(None);
- };
- let children = extension.node.inputs();
- if children.len() != necessary_children_indices.len() {
- return internal_err!("Inconsistent length between children and
necessary children indices. \
- Make sure `.necessary_children_exprs` implementation of the
`UserDefinedLogicalNode` is \
- consistent with actual children length for the node.");
- }
- children
- .into_iter()
- .zip(necessary_children_indices)
- .map(|(child, necessary_indices)| {
- RequiredIndicies::new_from_indices(necessary_indices)
- .with_plan_exprs(plan, child.schema())
- })
- .collect::<Result<Vec<_>>>()?
- }
- LogicalPlan::EmptyRelation(_)
- | LogicalPlan::RecursiveQuery(_)
- | LogicalPlan::Statement(_)
- | LogicalPlan::Values(_)
- | LogicalPlan::DescribeTable(_) => {
- // These operators have no inputs, so stop the optimization
process.
- return Ok(None);
- }
+) -> Result<Transformed<LogicalPlan>> {
+ // Recursively rewrite any nodes that may be able to avoid computation
given
+ // their parents' required indices.
+ match plan {
LogicalPlan::Projection(proj) => {
- return if let Some(proj) = merge_consecutive_projections(proj)? {
- Ok(Some(
- rewrite_projection_given_requirements(&proj, config,
indices)?
- // Even if we cannot optimize the projection, merge if
possible:
- .unwrap_or_else(|| LogicalPlan::Projection(proj)),
- ))
- } else {
+ return merge_consecutive_projections(proj)?.transform_data(|proj| {
rewrite_projection_given_requirements(proj, config, indices)
- };
+ })
}
LogicalPlan::Aggregate(aggregate) => {
// Split parent requirements to GROUP BY and aggregate sections:
@@ -211,6 +145,7 @@ fn optimize_projections(
.iter()
.map(|group_by_expr| group_by_expr.display_name())
.collect::<Result<Vec<_>>>()?;
+
let new_group_bys = if let Some(simplest_groupby_indices) =
get_required_group_by_exprs_indices(
aggregate.input.schema(),
@@ -223,7 +158,7 @@ fn optimize_projections(
.append(&simplest_groupby_indices)
.get_at_indices(&aggregate.group_expr)
} else {
- aggregate.group_expr.clone()
+ aggregate.group_expr
};
// Only use the absolutely necessary aggregate expressions required
@@ -242,7 +177,9 @@ fn optimize_projections(
&& new_group_bys.is_empty()
&& !aggregate.aggr_expr.is_empty()
{
- new_aggr_expr = vec![aggregate.aggr_expr[0].clone()];
+ // take the old, first aggregate expression
+ new_aggr_expr = aggregate.aggr_expr;
+ new_aggr_expr.resize_with(1, || unreachable!());
}
let all_exprs_iter =
new_group_bys.iter().chain(new_aggr_expr.iter());
@@ -251,32 +188,31 @@ fn optimize_projections(
RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
let necessary_exprs = necessary_indices.get_required_exprs(schema);
- let aggregate_input = if let Some(input) =
- optimize_projections(&aggregate.input, config,
necessary_indices)?
- {
- input
- } else {
- aggregate.input.as_ref().clone()
- };
-
- // Simplify the input of the aggregation by adding a projection so
- // that its input only contains absolutely necessary columns for
- // the aggregate expressions. Note that necessary_indices refer to
- // fields in `aggregate.input.schema()`.
- let (aggregate_input, _) =
- add_projection_on_top_if_helpful(aggregate_input,
necessary_exprs)?;
-
- // Create a new aggregate plan with the updated input and only the
- // absolutely necessary fields:
- return Aggregate::try_new(
- Arc::new(aggregate_input),
- new_group_bys,
- new_aggr_expr,
- )
- .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate)));
+ return optimize_projections(
+ unwrap_arc(aggregate.input),
+ config,
+ necessary_indices,
+ )?
+ .transform_data(|aggregate_input| {
+ // Simplify the input of the aggregation by adding a
projection so
+ // that its input only contains absolutely necessary columns
for
+ // the aggregate expressions. Note that necessary_indices
refer to
+ // fields in `aggregate.input.schema()`.
+ add_projection_on_top_if_helpful(aggregate_input,
necessary_exprs)
+ })?
+ .map_data(|aggregate_input| {
+ // Create a new aggregate plan with the updated input and only
the
+ // absolutely necessary fields:
+ Aggregate::try_new(
+ Arc::new(aggregate_input),
+ new_group_bys,
+ new_aggr_expr,
+ )
+ .map(LogicalPlan::Aggregate)
+ });
}
LogicalPlan::Window(window) => {
- let input_schema = window.input.schema();
+ let input_schema = window.input.schema().clone();
// Split parent requirements to child and window expression
sections:
let n_input_fields = input_schema.fields().len();
// Offset window expression indices so that they point to valid
@@ -290,38 +226,152 @@ fn optimize_projections(
// Get all the required column indices at the input, either by the
// parent or window expression requirements.
let required_indices =
- child_reqs.with_exprs(input_schema, &new_window_expr)?;
+ child_reqs.with_exprs(&input_schema, &new_window_expr)?;
- let window_child = if let Some(new_window_child) =
- optimize_projections(&window.input, config,
required_indices.clone())?
- {
- new_window_child
- } else {
- window.input.as_ref().clone()
+ return optimize_projections(
+ unwrap_arc(window.input),
+ config,
+ required_indices.clone(),
+ )?
+ .transform_data(|window_child| {
+ if new_window_expr.is_empty() {
+ // When no window expression is necessary, use the input
directly:
+ Ok(Transformed::no(window_child))
+ } else {
+ // Calculate required expressions at the input of the
window.
+ // Please note that we use `input_schema`, because
`required_indices`
+ // refers to that schema
+ let required_exprs =
+ required_indices.get_required_exprs(&input_schema);
+ let window_child =
+ add_projection_on_top_if_helpful(window_child,
required_exprs)?
+ .data;
+ Window::try_new(new_window_expr, Arc::new(window_child))
+ .map(LogicalPlan::Window)
+ .map(Transformed::yes)
+ }
+ });
+ }
+ LogicalPlan::TableScan(table_scan) => {
+ let TableScan {
+ table_name,
+ source,
+ projection,
+ filters,
+ fetch,
+ projected_schema: _,
+ } = table_scan;
+
+ // Get indices referred to in the original (schema with all fields)
+ // given projected indices.
+ let projection = match &projection {
+ Some(projection) => indices.into_mapped_indices(|idx|
projection[idx]),
+ None => indices.into_inner(),
};
+ return TableScan::try_new(
+ table_name,
+ source,
+ Some(projection),
+ filters,
+ fetch,
+ )
+ .map(LogicalPlan::TableScan)
+ .map(Transformed::yes);
+ }
- return if new_window_expr.is_empty() {
- // When no window expression is necessary, use the input
directly:
- Ok(Some(window_child))
- } else {
- // Calculate required expressions at the input of the window.
- // Please note that we use `old_child`, because
`required_indices`
- // refers to `old_child`.
- let required_exprs =
required_indices.get_required_exprs(input_schema);
- let (window_child, _) =
- add_projection_on_top_if_helpful(window_child,
required_exprs)?;
- Window::try_new(new_window_expr, Arc::new(window_child))
- .map(|window| Some(LogicalPlan::Window(window)))
+ // Other node types are handled below
+ _ => {}
+ };
+
+ // For other plan node types, calculate indices for columns they use and
+ // try to rewrite their children
+ let mut child_required_indices: Vec<RequiredIndicies> = match &plan {
+ LogicalPlan::Sort(_)
+ | LogicalPlan::Filter(_)
+ | LogicalPlan::Repartition(_)
+ | LogicalPlan::Unnest(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Distinct(Distinct::On(_)) => {
+ // Pass index requirements from the parent as well as column
indices
+ // that appear in this plan's expressions to its child. All these
+ // operators benefit from "small" inputs, so the
projection_beneficial
+ // flag is `true`.
+ plan.inputs()
+ .into_iter()
+ .map(|input| {
+ indices
+ .clone()
+ .with_projection_beneficial()
+ .with_plan_exprs(&plan, input.schema())
+ })
+ .collect::<Result<_>>()?
+ }
+ LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => {
+ // Pass index requirements from the parent as well as column
indices
+ // that appear in this plan's expressions to its child. These
operators
+ // do not benefit from "small" inputs, so the projection_beneficial
+ // flag is `false`.
+ plan.inputs()
+ .into_iter()
+ .map(|input| indices.clone().with_plan_exprs(&plan,
input.schema()))
+ .collect::<Result<_>>()?
+ }
+ LogicalPlan::Copy(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Dml(_)
+ | LogicalPlan::Explain(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Subquery(_)
+ | LogicalPlan::Distinct(Distinct::All(_)) => {
+ // These plans require all their fields, and their children should
+ // be treated as final plans -- otherwise, we may have schema a
+ // mismatch.
+ // TODO: For some subquery variants (e.g. a subquery arising from
an
+ // EXISTS expression), we may not need to require all
indices.
+ plan.inputs()
+ .into_iter()
+ .map(RequiredIndicies::new_for_all_exprs)
+ .collect()
+ }
+ LogicalPlan::Extension(extension) => {
+ let Some(necessary_children_indices) =
+ extension.node.necessary_children_exprs(indices.indices())
+ else {
+ // Requirements from parent cannot be routed down to user
defined logical plan safely
+ return Ok(Transformed::no(plan));
};
+ let children = extension.node.inputs();
+ if children.len() != necessary_children_indices.len() {
+ return internal_err!("Inconsistent length between children and
necessary children indices. \
+ Make sure `.necessary_children_exprs` implementation of the
`UserDefinedLogicalNode` is \
+ consistent with actual children length for the node.");
+ }
+ children
+ .into_iter()
+ .zip(necessary_children_indices)
+ .map(|(child, necessary_indices)| {
+ RequiredIndicies::new_from_indices(necessary_indices)
+ .with_plan_exprs(&plan, child.schema())
+ })
+ .collect::<Result<Vec<_>>>()?
+ }
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::RecursiveQuery(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::Values(_)
+ | LogicalPlan::DescribeTable(_) => {
+ // These operators have no inputs, so stop the optimization
process.
+ return Ok(Transformed::no(plan));
}
LogicalPlan::Join(join) => {
let left_len = join.left.schema().fields().len();
let (left_req_indices, right_req_indices) =
split_join_requirements(left_len, indices, &join.join_type);
let left_indices =
- left_req_indices.with_plan_exprs(plan, join.left.schema())?;
+ left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
let right_indices =
- right_req_indices.with_plan_exprs(plan, join.right.schema())?;
+ right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
vec![
@@ -340,55 +390,53 @@ fn optimize_projections(
right_indices.with_projection_beneficial(),
]
}
- LogicalPlan::TableScan(table_scan) => {
- // Get indices referred to in the original (schema with all fields)
- // given projected indices.
- let projection = match &table_scan.projection {
- Some(projection) => indices.into_mapped_indices(|idx|
projection[idx]),
- None => indices.into_inner(),
- };
- return TableScan::try_new(
- table_scan.table_name.clone(),
- table_scan.source.clone(),
- Some(projection),
- table_scan.filters.clone(),
- table_scan.fetch,
- )
- .map(|table| Some(LogicalPlan::TableScan(table)));
+ // these nodes are explicitly rewritten in the match statement above
+ LogicalPlan::Projection(_)
+ | LogicalPlan::Aggregate(_)
+ | LogicalPlan::Window(_)
+ | LogicalPlan::TableScan(_) => {
+ return internal_err!(
+ "OptimizeProjection: should have handled in the match
statement above"
+ );
}
};
- let new_inputs = izip!(child_required_indices, plan.inputs().into_iter())
- .map(|(required_indices, child)| {
- let projection_beneficial =
required_indices.projection_beneficial();
- let project_exprs =
required_indices.get_required_exprs(child.schema());
- let (input, is_changed) = if let Some(new_input) =
- optimize_projections(child, config, required_indices)?
- {
- (new_input, true)
- } else {
- (child.clone(), false)
- };
- let (input, proj_added) = if projection_beneficial {
- add_projection_on_top_if_helpful(input, project_exprs)?
- } else {
- (input, false)
- };
- Ok((is_changed || proj_added).then_some(input))
- })
- .collect::<Result<Vec<_>>>()?;
- if new_inputs.iter().all(|child| child.is_none()) {
- // All children are the same in this case, no need to change the plan:
- Ok(None)
+ // Required indices are currently ordered (child0, child1, ...)
+ // but the loop pops off the last element, so we need to reverse the order
+ child_required_indices.reverse();
+ if child_required_indices.len() != plan.inputs().len() {
+ return internal_err!(
+ "OptimizeProjection: child_required_indices length mismatch with
plan inputs"
+ );
+ }
+
+ // Rewrite children of the plan
+ let transformed_plan = plan.map_children(|child| {
+ let required_indices = child_required_indices.pop().ok_or_else(|| {
+ internal_datafusion_err!(
+ "Unexpected number of required_indices in OptimizeProjections
rule"
+ )
+ })?;
+
+ let projection_beneficial = required_indices.projection_beneficial();
+ let project_exprs =
required_indices.get_required_exprs(child.schema());
+
+ optimize_projections(child, config, required_indices)?.transform_data(
+ |new_input| {
+ if projection_beneficial {
+ add_projection_on_top_if_helpful(new_input, project_exprs)
+ } else {
+ Ok(Transformed::no(new_input))
+ }
+ },
+ )
+ })?;
+
+ // If any of the children are transformed, we need to potentially update
the plan's schema
+ if transformed_plan.transformed {
+ transformed_plan.map_data(|plan| plan.recompute_schema())
} else {
- // At least one of the children is changed:
- let new_inputs = izip!(new_inputs, plan.inputs())
- // If new_input is `None`, this means child is not changed, so use
- // `old_child` during construction:
- .map(|(new_input, old_child)| new_input.unwrap_or_else(||
old_child.clone()))
- .collect();
- let exprs = plan.expressions();
- plan.with_new_exprs(exprs, new_inputs).map(Some)
+ Ok(transformed_plan)
}
}
@@ -412,22 +460,28 @@ fn optimize_projections(
/// merged projection.
/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken
place).
/// - `Err(error)`: An error occured during the function call.
-fn merge_consecutive_projections(proj: &Projection) ->
Result<Option<Projection>> {
- let LogicalPlan::Projection(prev_projection) = proj.input.as_ref() else {
- return Ok(None);
+fn merge_consecutive_projections(proj: Projection) ->
Result<Transformed<Projection>> {
+ let Projection {
+ expr,
+ input,
+ schema,
+ ..
+ } = proj;
+ let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
+ return Projection::try_new_with_schema(expr, input,
schema).map(Transformed::no);
};
// Count usages (referrals) of each projection expression in its input
fields:
let mut column_referral_map = HashMap::<Column, usize>::new();
- for columns in proj.expr.iter().flat_map(|expr| expr.to_columns()) {
+ for columns in expr.iter().flat_map(|expr| expr.to_columns()) {
for col in columns.into_iter() {
*column_referral_map.entry(col.clone()).or_default() += 1;
}
}
- // If an expression is non-trivial and appears more than once, consecutive
- // projections will benefit from a compute-once approach. For details, see:
- // https://github.com/apache/datafusion/issues/8296
+ // If an expression is non-trivial and appears more than once, do not merge
+ // them as consecutive projections will benefit from a compute-once
approach.
+ // For details, see: https://github.com/apache/datafusion/issues/8296
if column_referral_map.into_iter().any(|(col, usage)| {
usage > 1
&& !is_expr_trivial(
@@ -435,33 +489,78 @@ fn merge_consecutive_projections(proj: &Projection) ->
Result<Option<Projection>
[prev_projection.schema.index_of_column(&col).unwrap()],
)
}) {
- return Ok(None);
+ // no change
+ return Projection::try_new_with_schema(expr, input,
schema).map(Transformed::no);
}
- // If all the expression of the top projection can be rewritten, do so and
- // create a new projection:
- let new_exprs = proj
- .expr
- .iter()
- .map(|expr| rewrite_expr(expr, prev_projection))
- .collect::<Result<Option<Vec<_>>>>()?;
- if let Some(new_exprs) = new_exprs {
+ let LogicalPlan::Projection(prev_projection) = unwrap_arc(input) else {
+ // We know it is a `LogicalPlan::Projection` from check above
+ unreachable!();
+ };
+
+ // Try to rewrite the expressions in the current projection using the
+ // previous projection as input:
+ let name_preserver = NamePreserver::new_for_projection();
+ let mut original_names = vec![];
+ let new_exprs = expr.into_iter().map_until_stop_and_collect(|expr| {
+ original_names.push(name_preserver.save(&expr)?);
+
+ // do not rewrite top level Aliases (rewriter will remove all aliases
within exprs)
+ match expr {
+ Expr::Alias(Alias {
+ expr,
+ relation,
+ name,
+ }) => rewrite_expr(*expr, &prev_projection).map(|result| {
+ result.update_data(|expr| Expr::Alias(Alias::new(expr,
relation, name)))
+ }),
+ e => rewrite_expr(e, &prev_projection),
+ }
+ })?;
+
+ // if the expressions could be rewritten, create a new projection with the
+ // new expressions
+ if new_exprs.transformed {
+ // Add any needed aliases back to the expressions
let new_exprs = new_exprs
+ .data
.into_iter()
- .zip(proj.expr.iter())
- .map(|(new_expr, old_expr)| {
- new_expr.alias_if_changed(old_expr.name_for_alias()?)
- })
+ .zip(original_names.into_iter())
+ .map(|(expr, original_name)| original_name.restore(expr))
.collect::<Result<Vec<_>>>()?;
- Projection::try_new(new_exprs, prev_projection.input.clone()).map(Some)
+ Projection::try_new(new_exprs,
prev_projection.input).map(Transformed::yes)
} else {
- Ok(None)
+ // not rewritten, so put the projection back together
+ let input = Arc::new(LogicalPlan::Projection(prev_projection));
+ Projection::try_new_with_schema(new_exprs.data, input, schema)
+ .map(Transformed::no)
}
}
-/// Trim the given expression by removing any unnecessary layers of aliasing.
-/// If the expression is an alias, the function returns the underlying
expression.
-/// Otherwise, it returns the given expression as is.
+// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
+fn is_expr_trivial(expr: &Expr) -> bool {
+ matches!(expr, Expr::Column(_) | Expr::Literal(_))
+}
+
+/// Rewrites a projection expression using the projection before it (i.e. its
input)
+/// This is a subroutine to the `merge_consecutive_projections` function.
+///
+/// # Parameters
+///
+/// * `expr` - A reference to the expression to rewrite.
+/// * `input` - A reference to the input of the projection expression (itself
+/// a projection).
+///
+/// # Returns
+///
+/// A `Result` object with the following semantics:
+///
+/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
+/// - `Ok(None)`: Signals that `expr` can not be rewritten.
+/// - `Err(error)`: An error occurred during the function call.
+///
+/// # Notes
+/// This rewrite also removes any unnecessary layers of aliasing.
///
/// Without trimming, we can end up with unnecessary indirections inside
expressions
/// during projection merges.
@@ -487,84 +586,28 @@ fn merge_consecutive_projections(proj: &Projection) ->
Result<Option<Projection>
/// Projection((a as a1 + b as b1) as sum1)
/// --Source(a, b)
/// ```
-fn trim_expr(expr: Expr) -> Expr {
- match expr {
- Expr::Alias(alias) => trim_expr(*alias.expr),
- _ => expr,
- }
-}
-
-// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
-fn is_expr_trivial(expr: &Expr) -> bool {
- matches!(expr, Expr::Column(_) | Expr::Literal(_))
-}
-
-// Exit early when there is no rewrite to do.
-macro_rules! rewrite_expr_with_check {
- ($expr:expr, $input:expr) => {
- if let Some(value) = rewrite_expr($expr, $input)? {
- value
- } else {
- return Ok(None);
- }
- };
-}
-
-/// Rewrites a projection expression using the projection before it (i.e. its
input)
-/// This is a subroutine to the `merge_consecutive_projections` function.
-///
-/// # Parameters
-///
-/// * `expr` - A reference to the expression to rewrite.
-/// * `input` - A reference to the input of the projection expression (itself
-/// a projection).
-///
-/// # Returns
-///
-/// A `Result` object with the following semantics:
-///
-/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
-/// - `Ok(None)`: Signals that `expr` can not be rewritten.
-/// - `Err(error)`: An error occurred during the function call.
-fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
- let result = match expr {
- Expr::Column(col) => {
- // Find index of column:
- let idx = input.schema.index_of_column(col)?;
- input.expr[idx].clone()
- }
- Expr::BinaryExpr(binary) => Expr::BinaryExpr(BinaryExpr::new(
- Box::new(trim_expr(rewrite_expr_with_check!(&binary.left, input))),
- binary.op,
- Box::new(trim_expr(rewrite_expr_with_check!(&binary.right,
input))),
- )),
- Expr::Alias(alias) => Expr::Alias(Alias::new(
- trim_expr(rewrite_expr_with_check!(&alias.expr, input)),
- alias.relation.clone(),
- alias.name.clone(),
- )),
- Expr::Literal(_) => expr.clone(),
- Expr::Cast(cast) => {
- let new_expr = rewrite_expr_with_check!(&cast.expr, input);
- Expr::Cast(Cast::new(Box::new(new_expr), cast.data_type.clone()))
- }
- Expr::ScalarFunction(scalar_fn) => {
- return Ok(scalar_fn
- .args
- .iter()
- .map(|expr| rewrite_expr(expr, input))
- .collect::<Result<Option<_>>>()?
- .map(|new_args| {
- Expr::ScalarFunction(ScalarFunction::new_udf(
- scalar_fn.func.clone(),
- new_args,
- ))
- }));
+fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
+ expr.transform_up(|expr| {
+ match expr {
+ // remove any intermediate aliases
+ Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
+ Expr::Column(col) => {
+ // Find index of column:
+ let idx = input.schema.index_of_column(&col)?;
+ // get the corresponding unaliased input expression
+ //
+ // For example:
+ // * the input projection is [`a + b` as c, `d + e` as f]
+ // * the current column is an expression "f"
+ //
+ // return the expression `d + e` (not `d + e` as f)
+ let input_expr = input.expr[idx].clone().unalias_nested();
+ Ok(Transformed::yes(input_expr))
+ }
+ // Unsupported type for consecutive projection merge analysis.
+ _ => Ok(Transformed::no(expr)),
}
- // Unsupported type for consecutive projection merge analysis.
- _ => return Ok(None),
- };
- Ok(Some(result))
+ })
}
/// Accumulates outer-referenced columns by the
@@ -682,19 +725,18 @@ fn split_join_requirements(
///
/// # Returns
///
-/// A `Result` containing a tuple with two values: The resulting `LogicalPlan`
-/// (with or without the added projection) and a `bool` flag indicating if a
-/// projection was added (`true`) or not (`false`).
+/// A `Transformed` indicating if a projection was added
fn add_projection_on_top_if_helpful(
plan: LogicalPlan,
project_exprs: Vec<Expr>,
-) -> Result<(LogicalPlan, bool)> {
+) -> Result<Transformed<LogicalPlan>> {
// Make sure projection decreases the number of columns, otherwise it is
unnecessary.
if project_exprs.len() >= plan.schema().fields().len() {
- Ok((plan, false))
+ Ok(Transformed::no(plan))
} else {
Projection::try_new(project_exprs, Arc::new(plan))
- .map(|proj| (LogicalPlan::Projection(proj), true))
+ .map(LogicalPlan::Projection)
+ .map(Transformed::yes)
}
}
@@ -716,37 +758,30 @@ fn add_projection_on_top_if_helpful(
/// - `Ok(None)`: No rewrite necessary.
/// - `Err(error)`: An error occured during the function call.
fn rewrite_projection_given_requirements(
- proj: &Projection,
+ proj: Projection,
config: &dyn OptimizerConfig,
indices: RequiredIndicies,
-) -> Result<Option<LogicalPlan>> {
- let exprs_used = indices.get_at_indices(&proj.expr);
+) -> Result<Transformed<LogicalPlan>> {
+ let Projection { expr, input, .. } = proj;
+
+ let exprs_used = indices.get_at_indices(&expr);
let required_indices =
- RequiredIndicies::new().with_exprs(proj.input.schema(),
exprs_used.iter())?;
- return if let Some(input) =
- optimize_projections(&proj.input, config, required_indices)?
- {
- if is_projection_unnecessary(&input, &exprs_used)? {
- Ok(Some(input))
- } else {
- Projection::try_new(exprs_used, Arc::new(input))
- .map(|proj| Some(LogicalPlan::Projection(proj)))
- }
- } else if exprs_used.len() < proj.expr.len() {
- // Projection expression used is different than the existing
projection.
- // In this case, even if the child doesn't change, we should update the
- // projection to use fewer columns:
- if is_projection_unnecessary(&proj.input, &exprs_used)? {
- Ok(Some(proj.input.as_ref().clone()))
- } else {
- Projection::try_new(exprs_used, proj.input.clone())
- .map(|proj| Some(LogicalPlan::Projection(proj)))
- }
- } else {
- // Projection doesn't change.
- Ok(None)
- };
+ RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?;
+
+ // rewrite the children projection, and if they are changed rewrite the
+ // projection down
+ optimize_projections(unwrap_arc(input), config,
required_indices)?.transform_data(
+ |input| {
+ if is_projection_unnecessary(&input, &exprs_used)? {
+ Ok(Transformed::yes(input))
+ } else {
+ Projection::try_new(exprs_used, Arc::new(input))
+ .map(LogicalPlan::Projection)
+ .map(Transformed::yes)
+ }
+ },
+ )
}
/// Projection is unnecessary, when
@@ -761,6 +796,7 @@ fn is_projection_unnecessary(input: &LogicalPlan,
proj_exprs: &[Expr]) -> Result
mod tests {
use std::collections::HashMap;
use std::fmt::Formatter;
+ use std::ops::Add;
use std::sync::Arc;
use std::vec;
@@ -1184,13 +1220,32 @@ mod tests {
assert_optimized_plan_equal(plan, expected)
}
+ // Test Case expression
+ #[test]
+ fn test_case_merged() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a"), lit(0).alias("d")])?
+ .project(vec![
+ col("a"),
+ when(col("a").eq(lit(1)), lit(10))
+ .otherwise(col("d"))?
+ .alias("d"),
+ ])?
+ .build()?;
+
+ let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN
Int32(10) ELSE Int32(0) END AS d\
+ \n TableScan: test projection=[a]";
+ assert_optimized_plan_equal(plan, expected)
+ }
+
// Test outer projection isn't discarded despite the same schema as inner
// https://github.com/apache/datafusion/issues/8942
#[test]
fn test_derived_column() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("a"), lit(0).alias("d")])?
+ .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
.project(vec![
col("a"),
when(col("a").eq(lit(1)), lit(10))
@@ -1199,8 +1254,9 @@ mod tests {
])?
.build()?;
- let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN
Int32(10) ELSE d END AS d\
- \n Projection: test.a, Int32(0) AS d\
+ let expected =
+ "Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END
AS d\
+ \n Projection: test.a + Int32(1) AS a, Int32(0) AS d\
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(plan, expected)
}
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 1c20501da5..fd47cb23b1 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -295,6 +295,13 @@ impl NamePreserver {
}
}
+ /// Create a new NamePreserver for rewriting the `expr`s in `Projection`
+ ///
+ /// This will use aliases
+ pub fn new_for_projection() -> Self {
+ Self { use_alias: true }
+ }
+
pub fn save(&self, expr: &Expr) -> Result<SavedName> {
let original_name = if self.use_alias {
Some(expr.name_for_alias()?)
diff --git a/datafusion/sqllogictest/test_files/cte.slt
b/datafusion/sqllogictest/test_files/cte.slt
index e9c508cb27..0549177299 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -31,10 +31,9 @@ query TT
EXPLAIN WITH "NUMBERS" AS (SELECT 1 as a, 2 as b, 3 as c) SELECT "NUMBERS".*
FROM "NUMBERS"
----
logical_plan
-01)Projection: NUMBERS.a, NUMBERS.b, NUMBERS.c
-02)--SubqueryAlias: NUMBERS
-03)----Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c
-04)------EmptyRelation
+01)SubqueryAlias: NUMBERS
+02)--Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c
+03)----EmptyRelation
physical_plan
01)ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
02)--PlaceholderRowExec
@@ -105,14 +104,13 @@ EXPLAIN WITH RECURSIVE nodes AS (
SELECT * FROM nodes
----
logical_plan
-01)Projection: nodes.id
-02)--SubqueryAlias: nodes
-03)----RecursiveQuery: is_distinct=false
-04)------Projection: Int64(1) AS id
-05)--------EmptyRelation
-06)------Projection: nodes.id + Int64(1) AS id
-07)--------Filter: nodes.id < Int64(10)
-08)----------TableScan: nodes
+01)SubqueryAlias: nodes
+02)--RecursiveQuery: is_distinct=false
+03)----Projection: Int64(1) AS id
+04)------EmptyRelation
+05)----Projection: nodes.id + Int64(1) AS id
+06)------Filter: nodes.id < Int64(10)
+07)--------TableScan: nodes
physical_plan
01)RecursiveQueryExec: name=nodes, is_distinct=false
02)--ProjectionExec: expr=[1 as id]
@@ -152,14 +150,13 @@ ORDER BY time, name, account_balance
----
logical_plan
01)Sort: balances.time ASC NULLS LAST, balances.name ASC NULLS LAST,
balances.account_balance ASC NULLS LAST
-02)--Projection: balances.time, balances.name, balances.account_balance
-03)----SubqueryAlias: balances
-04)------RecursiveQuery: is_distinct=false
-05)--------Projection: balance.time, balance.name, balance.account_balance
-06)----------TableScan: balance
-07)--------Projection: balances.time + Int64(1) AS time, balances.name,
balances.account_balance + Int64(10) AS account_balance
-08)----------Filter: balances.time < Int64(10)
-09)------------TableScan: balances
+02)--SubqueryAlias: balances
+03)----RecursiveQuery: is_distinct=false
+04)------Projection: balance.time, balance.name, balance.account_balance
+05)--------TableScan: balance
+06)------Projection: balances.time + Int64(1) AS time, balances.name,
balances.account_balance + Int64(10) AS account_balance
+07)--------Filter: balances.time < Int64(10)
+08)----------TableScan: balances
physical_plan
01)SortExec: expr=[time@0 ASC NULLS LAST,name@1 ASC NULLS
LAST,account_balance@2 ASC NULLS LAST], preserve_partitioning=[false]
02)--RecursiveQueryExec: name=balances, is_distinct=false
@@ -720,18 +717,17 @@ explain WITH RECURSIVE recursive_cte AS (
SELECT * FROM recursive_cte;
----
logical_plan
-01)Projection: recursive_cte.val
-02)--SubqueryAlias: recursive_cte
-03)----RecursiveQuery: is_distinct=false
-04)------Projection: Int64(1) AS val
-05)--------EmptyRelation
-06)------Projection: Int64(2) AS val
-07)--------CrossJoin:
-08)----------Filter: recursive_cte.val < Int64(2)
-09)------------TableScan: recursive_cte
-10)----------SubqueryAlias: sub_cte
-11)------------Projection: Int64(2) AS val
-12)--------------EmptyRelation
+01)SubqueryAlias: recursive_cte
+02)--RecursiveQuery: is_distinct=false
+03)----Projection: Int64(1) AS val
+04)------EmptyRelation
+05)----Projection: Int64(2) AS val
+06)------CrossJoin:
+07)--------Filter: recursive_cte.val < Int64(2)
+08)----------TableScan: recursive_cte
+09)--------SubqueryAlias: sub_cte
+10)----------Projection: Int64(2) AS val
+11)------------EmptyRelation
physical_plan
01)RecursiveQueryExec: name=recursive_cte, is_distinct=false
02)--ProjectionExec: expr=[1 as val]
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index f4a86fd027..e2c77552f9 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3602,17 +3602,16 @@ EXPLAIN SELECT * FROM (
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
logical_plan
-01)Projection: a.c, a.d, rhs.e, rhs.f
-02)--Full Join: a.c = rhs.e
-03)----SubqueryAlias: a
-04)------Union
-05)--------Projection: Int64(1) AS c, Int64(2) AS d
-06)----------EmptyRelation
-07)--------Projection: Int64(1) AS c, Int64(3) AS d
-08)----------EmptyRelation
-09)----SubqueryAlias: rhs
-10)------Projection: Int64(1) AS e, Int64(3) AS f
-11)--------EmptyRelation
+01)Full Join: a.c = rhs.e
+02)--SubqueryAlias: a
+03)----Union
+04)------Projection: Int64(1) AS c, Int64(2) AS d
+05)--------EmptyRelation
+06)------Projection: Int64(1) AS c, Int64(3) AS d
+07)--------EmptyRelation
+08)--SubqueryAlias: rhs
+09)----Projection: Int64(1) AS e, Int64(3) AS f
+10)------EmptyRelation
physical_plan
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
02)--CoalesceBatchesExec: target_batch_size=2
@@ -3650,17 +3649,16 @@ EXPLAIN SELECT * FROM (
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
logical_plan
-01)Projection: a.c, a.d, rhs.e, rhs.f
-02)--Full Join: a.c = rhs.e
-03)----SubqueryAlias: a
-04)------Union
-05)--------Projection: Int64(1) AS c, Int64(2) AS d
-06)----------EmptyRelation
-07)--------Projection: Int64(1) AS c, Int64(3) AS d
-08)----------EmptyRelation
-09)----SubqueryAlias: rhs
-10)------Projection: Int64(1) AS e, Int64(3) AS f
-11)--------EmptyRelation
+01)Full Join: a.c = rhs.e
+02)--SubqueryAlias: a
+03)----Union
+04)------Projection: Int64(1) AS c, Int64(2) AS d
+05)--------EmptyRelation
+06)------Projection: Int64(1) AS c, Int64(3) AS d
+07)--------EmptyRelation
+08)--SubqueryAlias: rhs
+09)----Projection: Int64(1) AS e, Int64(3) AS f
+10)------EmptyRelation
physical_plan
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
02)--CoalesceBatchesExec: target_batch_size=2
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]