alamb commented on code in PR #10484:
URL: https://github.com/apache/datafusion/pull/10484#discussion_r1626228488


##########
datafusion/physical-expr/src/aggregate/build_in.rs:
##########
@@ -61,14 +61,9 @@ pub fn create_aggregate_expr(
         .collect::<Result<Vec<_>>>()?;
     let input_phy_exprs = input_phy_exprs.to_vec();
     Ok(match (fun, distinct) {
-        (AggregateFunction::Count, false) => Arc::new(
-            expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, 
data_type),
-        ),
-        (AggregateFunction::Count, true) => 
Arc::new(expressions::DistinctCount::new(
-            data_type,
-            input_phy_exprs[0].clone(),
-            name,
-        )),
+        (AggregateFunction::Count, _) => {
+            todo!("Builtin Count will be removed")

Review Comment:
   can we please make this an internal error rather than a panic?



##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1312,160 +1311,160 @@ mod tests {
         Arc::new(task_ctx)
     }
 
-    async fn check_grouping_sets(
-        input: Arc<dyn ExecutionPlan>,
-        spill: bool,
-    ) -> Result<()> {
-        let input_schema = input.schema();
-
-        let grouping_set = PhysicalGroupBy {
-            expr: vec![
-                (col("a", &input_schema)?, "a".to_string()),
-                (col("b", &input_schema)?, "b".to_string()),
-            ],
-            null_expr: vec![
-                (lit(ScalarValue::UInt32(None)), "a".to_string()),
-                (lit(ScalarValue::Float64(None)), "b".to_string()),
-            ],
-            groups: vec![
-                vec![false, true],  // (a, NULL)
-                vec![true, false],  // (NULL, b)
-                vec![false, false], // (a,b)
-            ],
-        };
-
-        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        ))];
-
-        let task_ctx = if spill {
-            new_spill_ctx(4, 1000)
-        } else {
-            Arc::new(TaskContext::default())
-        };
-
-        let partial_aggregate = Arc::new(AggregateExec::try_new(
-            AggregateMode::Partial,
-            grouping_set.clone(),
-            aggregates.clone(),
-            vec![None],
-            input,
-            input_schema.clone(),
-        )?);
-
-        let result =
-            common::collect(partial_aggregate.execute(0, 
task_ctx.clone())?).await?;
-
-        let expected = if spill {
-            vec![
-                "+---+-----+-----------------+",
-                "| a | b   | COUNT(1)[count] |",
-                "+---+-----+-----------------+",
-                "|   | 1.0 | 1               |",
-                "|   | 1.0 | 1               |",
-                "|   | 2.0 | 1               |",
-                "|   | 2.0 | 1               |",
-                "|   | 3.0 | 1               |",
-                "|   | 3.0 | 1               |",
-                "|   | 4.0 | 1               |",
-                "|   | 4.0 | 1               |",
-                "| 2 |     | 1               |",
-                "| 2 |     | 1               |",
-                "| 2 | 1.0 | 1               |",
-                "| 2 | 1.0 | 1               |",
-                "| 3 |     | 1               |",
-                "| 3 |     | 2               |",
-                "| 3 | 2.0 | 2               |",
-                "| 3 | 3.0 | 1               |",
-                "| 4 |     | 1               |",
-                "| 4 |     | 2               |",
-                "| 4 | 3.0 | 1               |",
-                "| 4 | 4.0 | 2               |",
-                "+---+-----+-----------------+",
-            ]
-        } else {
-            vec![
-                "+---+-----+-----------------+",
-                "| a | b   | COUNT(1)[count] |",
-                "+---+-----+-----------------+",
-                "|   | 1.0 | 2               |",
-                "|   | 2.0 | 2               |",
-                "|   | 3.0 | 2               |",
-                "|   | 4.0 | 2               |",
-                "| 2 |     | 2               |",
-                "| 2 | 1.0 | 2               |",
-                "| 3 |     | 3               |",
-                "| 3 | 2.0 | 2               |",
-                "| 3 | 3.0 | 1               |",
-                "| 4 |     | 3               |",
-                "| 4 | 3.0 | 1               |",
-                "| 4 | 4.0 | 2               |",
-                "+---+-----+-----------------+",
-            ]
-        };
-        assert_batches_sorted_eq!(expected, &result);
-
-        let groups = partial_aggregate.group_expr().expr().to_vec();
-
-        let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
-
-        let final_group: Vec<(Arc<dyn PhysicalExpr>, String)> = groups
-            .iter()
-            .map(|(_expr, name)| Ok((col(name, &input_schema)?, name.clone())))
-            .collect::<Result<_>>()?;
-
-        let final_grouping_set = PhysicalGroupBy::new_single(final_group);
-
-        let task_ctx = if spill {
-            new_spill_ctx(4, 3160)
-        } else {
-            task_ctx
-        };
-
-        let merged_aggregate = Arc::new(AggregateExec::try_new(
-            AggregateMode::Final,
-            final_grouping_set,
-            aggregates,
-            vec![None],
-            merge,
-            input_schema,
-        )?);
-
-        let result =
-            common::collect(merged_aggregate.execute(0, 
task_ctx.clone())?).await?;
-        let batch = concat_batches(&result[0].schema(), &result)?;
-        assert_eq!(batch.num_columns(), 3);
-        assert_eq!(batch.num_rows(), 12);
-
-        let expected = vec![
-            "+---+-----+----------+",
-            "| a | b   | COUNT(1) |",
-            "+---+-----+----------+",
-            "|   | 1.0 | 2        |",
-            "|   | 2.0 | 2        |",
-            "|   | 3.0 | 2        |",
-            "|   | 4.0 | 2        |",
-            "| 2 |     | 2        |",
-            "| 2 | 1.0 | 2        |",
-            "| 3 |     | 3        |",
-            "| 3 | 2.0 | 2        |",
-            "| 3 | 3.0 | 1        |",
-            "| 4 |     | 3        |",
-            "| 4 | 3.0 | 1        |",
-            "| 4 | 4.0 | 2        |",
-            "+---+-----+----------+",
-        ];
-
-        assert_batches_sorted_eq!(&expected, &result);
-
-        let metrics = merged_aggregate.metrics().unwrap();
-        let output_rows = metrics.output_rows().unwrap();
-        assert_eq!(12, output_rows);
-
-        Ok(())
-    }
+    // async fn check_grouping_sets(

Review Comment:
   I think we should either port or remove these tests -- probably port them, 
right?



##########
datafusion/physical-plan/src/windows/mod.rs:
##########
@@ -749,7 +749,7 @@ mod tests {
         let refs = blocking_exec.refs();
         let window_agg_exec = Arc::new(WindowAggExec::try_new(
             vec![create_window_expr(
-                
&WindowFunctionDefinition::AggregateFunction(AggregateFunction::Count),
+                &WindowFunctionDefinition::AggregateUDF(count_udaf()),

Review Comment:
   nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to