alamb commented on code in PR #8358:
URL: https://github.com/apache/arrow-datafusion/pull/8358#discussion_r1409769682


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1705,105 +1713,105 @@ pub fn 
create_aggregate_expr_with_name_and_maybe_filter(
 ) -> Result<AggregateExprWithOptionalArgs> {
     match e {
         Expr::AggregateFunction(AggregateFunction {
-            fun,
+            func_def,
             distinct,
             args,
             filter,
             order_by,
-        }) => {
-            let args = args
-                .iter()
-                .map(|e| {
-                    create_physical_expr(
+        }) => match func_def {
+            AggregateFunctionDefinition::BuiltIn { fun, name: _ } => {
+                let args = args
+                    .iter()
+                    .map(|e| {
+                        create_physical_expr(
+                            e,
+                            logical_input_schema,
+                            physical_input_schema,
+                            execution_props,
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let filter = match filter {
+                    Some(e) => Some(create_physical_expr(
                         e,
                         logical_input_schema,
                         physical_input_schema,
                         execution_props,
-                    )
-                })
-                .collect::<Result<Vec<_>>>()?;
-            let filter = match filter {
-                Some(e) => Some(create_physical_expr(
-                    e,
-                    logical_input_schema,
+                    )?),
+                    None => None,
+                };
+                let order_by = match order_by {
+                    Some(e) => Some(
+                        e.iter()
+                            .map(|expr| {
+                                create_physical_sort_expr(
+                                    expr,
+                                    logical_input_schema,
+                                    physical_input_schema,
+                                    execution_props,
+                                )
+                            })
+                            .collect::<Result<Vec<_>>>()?,
+                    ),
+                    None => None,
+                };
+                let ordering_reqs = order_by.clone().unwrap_or(vec![]);
+                let agg_expr = aggregates::create_aggregate_expr(
+                    fun,
+                    *distinct,
+                    &args,
+                    &ordering_reqs,
                     physical_input_schema,
-                    execution_props,
-                )?),
-                None => None,
-            };
-            let order_by = match order_by {
-                Some(e) => Some(
-                    e.iter()
-                        .map(|expr| {
-                            create_physical_sort_expr(
-                                expr,
-                                logical_input_schema,
-                                physical_input_schema,
-                                execution_props,
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?,
-                ),
-                None => None,
-            };
-            let ordering_reqs = order_by.clone().unwrap_or(vec![]);
-            let agg_expr = aggregates::create_aggregate_expr(
-                fun,
-                *distinct,
-                &args,
-                &ordering_reqs,
-                physical_input_schema,
-                name,
-            )?;
-            Ok((agg_expr, filter, order_by))
-        }
-        Expr::AggregateUDF(AggregateUDF {
-            fun,
-            args,
-            filter,
-            order_by,
-        }) => {
-            let args = args
-                .iter()
-                .map(|e| {
-                    create_physical_expr(
+                    name,
+                )?;
+                Ok((agg_expr, filter, order_by))
+            }
+            AggregateFunctionDefinition::UDF(fun) => {
+                let args = args
+                    .iter()
+                    .map(|e| {
+                        create_physical_expr(
+                            e,
+                            logical_input_schema,
+                            physical_input_schema,
+                            execution_props,
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let filter = match filter {
+                    Some(e) => Some(create_physical_expr(
                         e,
                         logical_input_schema,
                         physical_input_schema,
                         execution_props,
-                    )
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            let filter = match filter {
-                Some(e) => Some(create_physical_expr(
-                    e,
-                    logical_input_schema,
-                    physical_input_schema,
-                    execution_props,
-                )?),
-                None => None,
-            };
-            let order_by = match order_by {
-                Some(e) => Some(
-                    e.iter()
-                        .map(|expr| {
-                            create_physical_sort_expr(
-                                expr,
-                                logical_input_schema,
-                                physical_input_schema,
-                                execution_props,
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?,
-                ),
-                None => None,
-            };
+                    )?),
+                    None => None,
+                };
+                let order_by = match order_by {

Review Comment:
   I think it would be possible to do the order_by and filter conversions once 
rather than for both arms



##########
datafusion/expr/src/expr.rs:
##########
@@ -477,11 +475,36 @@ impl Sort {
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// Defines which implementation of a function for DataFusion to call.

Review Comment:
   ```suggestion
   /// Defines which implementation of an aggregate function DataFusion should 
call.
   ```



##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -123,19 +123,26 @@ impl ExprSchemable for Expr {
                     .collect::<Result<Vec<_>>>()?;
                 fun.return_type(&data_types)
             }
-            Expr::AggregateFunction(AggregateFunction { fun, args, .. }) => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                fun.return_type(&data_types)
-            }
-            Expr::AggregateUDF(AggregateUDF { fun, args, .. }) => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                fun.return_type(&data_types)
+            Expr::AggregateFunction(AggregateFunction { func_def, args, .. }) 
=> {
+                match func_def {
+                    AggregateFunctionDefinition::BuiltIn { fun, .. } => {
+                        let data_types = args

Review Comment:
   The data_types calculation could be hoisted above as well



##########
datafusion/expr/src/expr.rs:
##########
@@ -477,11 +475,36 @@ impl Sort {
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// Defines which implementation of a function for DataFusion to call.
+pub enum AggregateFunctionDefinition {
+    BuiltIn {
+        fun: aggregate_function::AggregateFunction,
+        name: Arc<str>,
+    },
+    /// Resolved to a user defined function

Review Comment:
   ```suggestion
       /// Resolved to a user defined aggregate function
   ```



##########
datafusion/expr/src/expr.rs:
##########
@@ -501,7 +524,27 @@ impl AggregateFunction {
         order_by: Option<Vec<Expr>>,
     ) -> Self {
         Self {
-            fun,
+            func_def: AggregateFunctionDefinition::BuiltIn {

Review Comment:
   While updating my project to use `ScalarFunctionDefinition` I found that 
managing `name` was somewhat awkward
   
   What do you think about making a function 
`AggregateFunctionDefinition::new_builtin()` that handles this name
   
   So this code would look like:
   
   ```rust
               func_def: AggregateFunctionDefinition::new_builtin(fun),
   ...
   ```
   ?



##########
datafusion/expr/src/expr.rs:
##########
@@ -1574,40 +1600,38 @@ fn create_name(e: &Expr) -> Result<String> {
             Ok(parts.join(" "))
         }
         Expr::AggregateFunction(AggregateFunction {
-            fun,
+            func_def,
             distinct,
             args,
             filter,
             order_by,
-        }) => {
-            let mut name = create_function_name(&fun.to_string(), *distinct, 
args)?;
-            if let Some(fe) = filter {
-                name = format!("{name} FILTER (WHERE {fe})");
-            };
-            if let Some(order_by) = order_by {
-                name = format!("{name} ORDER BY [{}]", 
expr_vec_fmt!(order_by));
-            };
-            Ok(name)
-        }
-        Expr::AggregateUDF(AggregateUDF {
-            fun,
-            args,
-            filter,
-            order_by,
-        }) => {
-            let mut names = Vec::with_capacity(args.len());
-            for e in args {
-                names.push(create_name(e)?);
-            }
-            let mut info = String::new();
-            if let Some(fe) = filter {
-                info += &format!(" FILTER (WHERE {fe})");
+        }) => match func_def {
+            AggregateFunctionDefinition::BuiltIn { fun: _, name: _ }
+            | AggregateFunctionDefinition::Name(_) => {
+                let mut name = create_function_name(func_def.name(), 
*distinct, args)?;

Review Comment:
   You might be able to combine the arms together here (the 
`AggregateFunctionDefinition::UDF` appears to do the same thing, though I 
realize it is not entirely the same)



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -229,30 +230,37 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
             create_function_physical_name(&fun.to_string(), false, args)
         }
         Expr::AggregateFunction(AggregateFunction {
-            fun,
+            func_def,
             distinct,
             args,
-            ..
-        }) => create_function_physical_name(&fun.to_string(), *distinct, args),
-        Expr::AggregateUDF(AggregateUDF {
-            fun,
-            args,
             filter,
             order_by,
-        }) => {
-            // TODO: Add support for filter and order by in AggregateUDF
-            if filter.is_some() {
-                return exec_err!("aggregate expression with filter is not 
supported");
+        }) => match func_def {
+            AggregateFunctionDefinition::BuiltIn { fun: _, name: _ } => {
+                create_function_physical_name(func_def.name(), *distinct, args)
             }
-            if order_by.is_some() {
-                return exec_err!("aggregate expression with order_by is not 
supported");
+            AggregateFunctionDefinition::UDF(fun) => {
+                // TODO: Add support for filter and order by in AggregateUDF
+                if filter.is_some() {
+                    return exec_err!(
+                        "aggregate expression with filter is not supported"
+                    );
+                }
+                if order_by.is_some() {
+                    return exec_err!(
+                        "aggregate expression with order_by is not supported"
+                    );
+                }
+                let mut names = Vec::with_capacity(args.len());
+                for e in args {
+                    names.push(create_physical_name(e, false)?);
+                }

Review Comment:
   I know it is the same as was here, but I think you can write this more 
concisely like
   
   ```suggestion
                   let names = args.iter()
                     .map(|e| create_physical_name(e, false))
                     .collect::<Result<Vec<_>>()?
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to