This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 137bf81a39 revert #6595 #6820 (#6827)
137bf81a39 is described below

commit 137bf81a39d4e4279a79f31aadee5bd75612017a
Author: jakevin <[email protected]>
AuthorDate: Tue Jul 4 14:59:04 2023 +0800

    revert #6595 #6820 (#6827)
    
    * revert: from_plan keep same schema Project in #6595
    
    * revert: from_plan keep same schema Agg/Window in #6820
    
    * revert type coercion
    
    * add comment
---
 datafusion/common/src/dfschema.rs                  |  8 +---
 datafusion/expr/src/utils.rs                       | 44 +++++++++++++---------
 datafusion/optimizer/src/analyzer/type_coercion.rs | 11 +++++-
 3 files changed, 38 insertions(+), 25 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index c490852c6e..cb07f15b9d 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -384,12 +384,8 @@ impl DFSchema {
         let self_fields = self.fields().iter();
         let other_fields = other.fields().iter();
         self_fields.zip(other_fields).all(|(f1, f2)| {
-            // TODO: resolve field when exist alias
-            // f1.qualifier() == f2.qualifier()
-            //     && f1.name() == f2.name()
-            // column(t1.a) field is "t1"."a"
-            // column(x) as t1.a field is ""."t1.a"
-            f1.qualified_name() == f2.qualified_name()
+            f1.qualifier() == f2.qualifier()
+                && f1.name() == f2.name()
                 && Self::datatype_is_semantically_equal(f1.data_type(), 
f2.data_type())
         })
     }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 069ce6df71..3111579246 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -724,16 +724,22 @@ where
 /// // create new plan using rewritten_exprs in same position
 /// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs);
 /// ```
+///
+/// Notice: sometimes [from_plan] will use schema of original plan, it don't 
change schema!
+/// Such as `Projection/Aggregate/Window`
 pub fn from_plan(
     plan: &LogicalPlan,
     expr: &[Expr],
     inputs: &[LogicalPlan],
 ) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Projection(_) => 
Ok(LogicalPlan::Projection(Projection::try_new(
-            expr.to_vec(),
-            Arc::new(inputs[0].clone()),
-        )?)),
+        LogicalPlan::Projection(Projection { schema, .. }) => {
+            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                expr.to_vec(),
+                Arc::new(inputs[0].clone()),
+                schema.clone(),
+            )?))
+        }
         LogicalPlan::Dml(DmlStatement {
             table_name,
             table_schema,
@@ -818,19 +824,23 @@ pub fn from_plan(
                 input: Arc::new(inputs[0].clone()),
             })),
         },
-        LogicalPlan::Window(Window { window_expr, .. }) => {
-            Ok(LogicalPlan::Window(Window::try_new(
-                expr[0..window_expr.len()].to_vec(),
-                Arc::new(inputs[0].clone()),
-            )?))
-        }
-        LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
-            Ok(LogicalPlan::Aggregate(Aggregate::try_new(
-                Arc::new(inputs[0].clone()),
-                expr[0..group_expr.len()].to_vec(),
-                expr[group_expr.len()..].to_vec(),
-            )?))
-        }
+        LogicalPlan::Window(Window {
+            window_expr,
+            schema,
+            ..
+        }) => Ok(LogicalPlan::Window(Window {
+            input: Arc::new(inputs[0].clone()),
+            window_expr: expr[0..window_expr.len()].to_vec(),
+            schema: schema.clone(),
+        })),
+        LogicalPlan::Aggregate(Aggregate {
+            group_expr, schema, ..
+        }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+            Arc::new(inputs[0].clone()),
+            expr[0..group_expr.len()].to_vec(),
+            expr[group_expr.len()..].to_vec(),
+            schema.clone(),
+        )?)),
         LogicalPlan::Sort(SortPlan { fetch, .. }) => 
Ok(LogicalPlan::Sort(SortPlan {
             expr: expr.to_vec(),
             input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 8edf734b47..5d1fef5352 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -43,7 +43,7 @@ use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
     type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, 
LogicalPlan, Operator,
-    WindowFrame, WindowFrameBound, WindowFrameUnits,
+    Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
 use datafusion_expr::{ExprSchemable, Signature};
 
@@ -109,7 +109,14 @@ fn analyze_internal(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    from_plan(plan, &new_expr, &new_inputs)
+    // TODO: from_plan can't change the schema, so we need to do this here
+    match &plan {
+        LogicalPlan::Projection(_) => 
Ok(LogicalPlan::Projection(Projection::try_new(
+            new_expr,
+            Arc::new(new_inputs[0].clone()),
+        )?)),
+        _ => from_plan(plan, &new_expr, &new_inputs),
+    }
 }
 
 pub(crate) struct TypeCoercionRewriter {

Reply via email to