cj-zhukov commented on code in PR #21021:
URL: https://github.com/apache/datafusion/pull/21021#discussion_r2959903311


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -410,21 +411,76 @@ impl DataFrame {
         expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
     ) -> Result<DataFrame> {
         let expr_list: Vec<SelectExpr> =
-            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
+            expr_list.into_iter().map(|e| e.into()).collect();
 
+        // Extract plain expressions
         let expressions = expr_list.iter().filter_map(|e| match e {
             SelectExpr::Expression(expr) => Some(expr),
             _ => None,
         });
 
-        let window_func_exprs = find_window_exprs(expressions);
-        let plan = if window_func_exprs.is_empty() {
+        // Apply window functions first
+        let window_func_exprs = find_window_exprs(expressions.clone());
+
+        let mut plan = if window_func_exprs.is_empty() {
             self.plan
         } else {
             LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
         };
 
-        let project_plan = 
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+        // Collect aggregate expressions
+        let aggr_exprs = find_aggregate_exprs(expressions.clone());
+
+        // Check if any expression is non-aggregate
+        let has_non_aggregate_expr = expressions
+            .clone()
+            .any(|expr| 
find_aggregate_exprs(std::iter::once(expr)).is_empty());
+
+        // Fallback to projection:
+        // - already aggregated
+        // - contains non-aggregate expressions
+        // - no aggregates at all
+        if matches!(plan, LogicalPlan::Aggregate(_))
+            || has_non_aggregate_expr
+            || aggr_exprs.is_empty()
+        {
+            let project_plan =
+                LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+
+            return Ok(DataFrame {
+                session_state: self.session_state,
+                plan: project_plan,
+                projection_requires_validation: false,
+            });
+        }
+
+        // Build Aggregate node
+        let aggr_exprs: Vec<Expr> = aggr_exprs
+            .into_iter()
+            .enumerate()
+            .map(|(i, expr)| expr.alias(format!("__agg_{i}")))
+            .collect();
+
+        plan = LogicalPlanBuilder::from(plan)
+            .aggregate(Vec::<Expr>::new(), aggr_exprs)?
+            .build()?;
+
+        // Replace aggregates with their aliases
+        let mut rewritten_exprs = Vec::with_capacity(expr_list.len());
+        for (i, select_expr) in expr_list.into_iter().enumerate() {
+            match select_expr {
+                SelectExpr::Expression(expr) => {
+                    let column = 
Expr::Column(Column::from_name(format!("__agg_{i}")));

Review Comment:
   I’ll look into a safer way to generate unique aggregate aliases and update 
the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to