This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new d37351aa61 fix: from_plan generate Agg/Window can be with different schema. (#6820) d37351aa61 is described below commit d37351aa610c2c68e4cc2a57a5c89d6f0cfcedf5 Author: jakevin <jakevin...@gmail.com> AuthorDate: Sun Jul 2 21:08:11 2023 +0800 fix: from_plan generate Agg/Window can be with different schema. (#6820) * fix: from_plan generate Agg can be with different schema. * fix: from_plan generate Window can be with different schema. --- datafusion/expr/src/logical_plan/builder.rs | 15 +++++---------- datafusion/expr/src/logical_plan/plan.rs | 20 +++++++++++++++++-- datafusion/expr/src/utils.rs | 30 +++++++++++++---------------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 61d540049c..9ddf6231c5 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -825,17 +825,11 @@ impl LogicalPlanBuilder { window_expr: impl IntoIterator<Item = impl Into<Expr>>, ) -> Result<Self> { let window_expr = normalize_cols(window_expr, &self.plan)?; - let all_expr = window_expr.iter(); - validate_unique_names("Windows", all_expr.clone())?; - let mut window_fields: Vec<DFField> = self.plan.schema().fields().clone(); - window_fields.extend_from_slice(&exprlist_to_fields(all_expr, &self.plan)?); - let metadata = self.plan.schema().metadata().clone(); - - Ok(Self::from(LogicalPlan::Window(Window { - input: Arc::new(self.plan), + validate_unique_names("Windows", &window_expr)?; + Ok(Self::from(LogicalPlan::Window(Window::try_new( window_expr, - schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?), - }))) + Arc::new(self.plan), + )?))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -1229,6 +1223,7 @@ pub fn project( plan: LogicalPlan, expr: impl IntoIterator<Item = impl Into<Expr>>, ) -> Result<LogicalPlan> { + // TODO: move it into analyzer let input_schema = plan.schema(); let mut projected_expr = vec![]; for e in expr { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index cf94052252..e058708701 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeVisitor, VisitRecursion, }; use datafusion_common::{ - plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, - Result, ScalarValue, + plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, + OwnedTableReference, Result, ScalarValue, }; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; @@ -1400,6 +1400,22 @@ pub struct Window { pub schema: DFSchemaRef, } +impl Window { + /// Create a new window operator. + pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> { + let mut window_fields: Vec<DFField> = input.schema().fields().clone(); + window_fields + .extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?); + let metadata = input.schema().metadata().clone(); + + Ok(Window { + input, + window_expr, + schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?), + }) + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 61b0db53fb..069ce6df71 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -818,23 +818,19 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window(Window { - window_expr, - schema, - .. - }) => Ok(LogicalPlan::Window(Window { - input: Arc::new(inputs[0].clone()), - window_expr: expr[0..window_expr.len()].to_vec(), - schema: schema.clone(), - })), - LogicalPlan::Aggregate(Aggregate { - group_expr, schema, .. - }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( - Arc::new(inputs[0].clone()), - expr[0..group_expr.len()].to_vec(), - expr[group_expr.len()..].to_vec(), - schema.clone(), - )?)), + LogicalPlan::Window(Window { window_expr, .. }) => { + Ok(LogicalPlan::Window(Window::try_new( + expr[0..window_expr.len()].to_vec(), + Arc::new(inputs[0].clone()), + )?)) + } + LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { + Ok(LogicalPlan::Aggregate(Aggregate::try_new( + Arc::new(inputs[0].clone()), + expr[0..group_expr.len()].to_vec(), + expr[group_expr.len()..].to_vec(), + )?)) + } LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()),