This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d37351aa61 fix: from_plan generate Agg/Window can be with different 
schema. (#6820)
d37351aa61 is described below

commit d37351aa610c2c68e4cc2a57a5c89d6f0cfcedf5
Author: jakevin <jakevin...@gmail.com>
AuthorDate: Sun Jul 2 21:08:11 2023 +0800

    fix: from_plan generate Agg/Window can be with different schema. (#6820)
    
    * fix: from_plan generate Agg can be with different schema.
    
    * fix: from_plan generate Window can be with different schema.
---
 datafusion/expr/src/logical_plan/builder.rs | 15 +++++----------
 datafusion/expr/src/logical_plan/plan.rs    | 20 +++++++++++++++++--
 datafusion/expr/src/utils.rs                | 30 +++++++++++++----------------
 3 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 61d540049c..9ddf6231c5 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -825,17 +825,11 @@ impl LogicalPlanBuilder {
         window_expr: impl IntoIterator<Item = impl Into<Expr>>,
     ) -> Result<Self> {
         let window_expr = normalize_cols(window_expr, &self.plan)?;
-        let all_expr = window_expr.iter();
-        validate_unique_names("Windows", all_expr.clone())?;
-        let mut window_fields: Vec<DFField> = 
self.plan.schema().fields().clone();
-        window_fields.extend_from_slice(&exprlist_to_fields(all_expr, 
&self.plan)?);
-        let metadata = self.plan.schema().metadata().clone();
-
-        Ok(Self::from(LogicalPlan::Window(Window {
-            input: Arc::new(self.plan),
+        validate_unique_names("Windows", &window_expr)?;
+        Ok(Self::from(LogicalPlan::Window(Window::try_new(
             window_expr,
-            schema: Arc::new(DFSchema::new_with_metadata(window_fields, 
metadata)?),
-        })))
+            Arc::new(self.plan),
+        )?)))
     }
 
     /// Apply an aggregate: grouping on the `group_expr` expressions
@@ -1229,6 +1223,7 @@ pub fn project(
     plan: LogicalPlan,
     expr: impl IntoIterator<Item = impl Into<Expr>>,
 ) -> Result<LogicalPlan> {
+    // TODO: move it into analyzer
     let input_schema = plan.schema();
     let mut projected_expr = vec![];
     for e in expr {
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index cf94052252..e058708701 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
 };
 use datafusion_common::{
-    plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference,
-    Result, ScalarValue,
+    plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
+    OwnedTableReference, Result, ScalarValue,
 };
 use std::collections::{HashMap, HashSet};
 use std::fmt::{self, Debug, Display, Formatter};
@@ -1400,6 +1400,22 @@ pub struct Window {
     pub schema: DFSchemaRef,
 }
 
+impl Window {
+    /// Create a new window operator.
+    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> 
Result<Self> {
+        let mut window_fields: Vec<DFField> = input.schema().fields().clone();
+        window_fields
+            .extend_from_slice(&exprlist_to_fields(window_expr.iter(), 
input.as_ref())?);
+        let metadata = input.schema().metadata().clone();
+
+        Ok(Window {
+            input,
+            window_expr,
+            schema: Arc::new(DFSchema::new_with_metadata(window_fields, 
metadata)?),
+        })
+    }
+}
+
 /// Produces rows from a table provider by reference or from the context
 #[derive(Clone)]
 pub struct TableScan {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 61b0db53fb..069ce6df71 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -818,23 +818,19 @@ pub fn from_plan(
                 input: Arc::new(inputs[0].clone()),
             })),
         },
-        LogicalPlan::Window(Window {
-            window_expr,
-            schema,
-            ..
-        }) => Ok(LogicalPlan::Window(Window {
-            input: Arc::new(inputs[0].clone()),
-            window_expr: expr[0..window_expr.len()].to_vec(),
-            schema: schema.clone(),
-        })),
-        LogicalPlan::Aggregate(Aggregate {
-            group_expr, schema, ..
-        }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
-            Arc::new(inputs[0].clone()),
-            expr[0..group_expr.len()].to_vec(),
-            expr[group_expr.len()..].to_vec(),
-            schema.clone(),
-        )?)),
+        LogicalPlan::Window(Window { window_expr, .. }) => {
+            Ok(LogicalPlan::Window(Window::try_new(
+                expr[0..window_expr.len()].to_vec(),
+                Arc::new(inputs[0].clone()),
+            )?))
+        }
+        LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
+            Ok(LogicalPlan::Aggregate(Aggregate::try_new(
+                Arc::new(inputs[0].clone()),
+                expr[0..group_expr.len()].to_vec(),
+                expr[group_expr.len()..].to_vec(),
+            )?))
+        }
         LogicalPlan::Sort(SortPlan { fetch, .. }) => 
Ok(LogicalPlan::Sort(SortPlan {
             expr: expr.to_vec(),
             input: Arc::new(inputs[0].clone()),

Reply via email to