2010YOUY01 commented on code in PR #18404:
URL: https://github.com/apache/datafusion/pull/18404#discussion_r2485205339
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1025,6 +1031,76 @@ impl ExecutionPlan for AggregateExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
+
Review Comment:
> Good point, I noticed that `FilterExec` calls
[split_conjunction](https://github.com/apache/datafusion/blob/e65dafe7330f0324b28d9037f4e5a73cf12e99ce/datafusion/expr/src/utils.rs#L953)
which already does this work (for non dynamic filters), but dynamic filters
come wrapped in a single physical expression with (potentially) multiple
columns depending on how many `on` key expresions there are, it looks something
like : `DynamicFilterPhysicalExpr([a, b])` <- we should split the conjunctions
for this expression too.
>
> I can make this improvement in this same PR, to make the filter(s)
pushdown more granular.
This discussion is really informative, I suggest to include it in the
comment.
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1025,6 +1031,86 @@ impl ExecutionPlan for AggregateExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
+
+ fn gather_filters_for_pushdown(
Review Comment:
```suggestion
/// Push down parent filters when possible (see implementation comment
for details),
/// but do not introduce any new self filters.
fn gather_filters_for_pushdown(
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -1892,3 +1891,277 @@ fn col_lit_predicate(
Arc::new(Literal::new(scalar_value)),
))
}
+
+#[tokio::test]
+async fn test_aggregate_filter_pushdown() {
+ // Test that filters can pass through AggregateExec even with aggregate
functions
+ // when the filter references grouping columns
+ // Simulates: SELECT a, COUNT(b) FROM table WHERE a = 'x' GROUP BY a
+
+ let batches =
+ vec![
+ record_batch!(("a", Utf8, ["x", "y"]), ("b", Utf8, ["foo",
"bar"])).unwrap(),
+ ];
+
+ let scan = TestScanBuilder::new(schema())
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+
+ // Create an aggregate: GROUP BY a with COUNT(b)
+ let group_by = PhysicalGroupBy::new_single(vec![(
+ col("a", &schema()).unwrap(),
+ "a".to_string(),
+ )]);
+
+ // Add COUNT aggregate
+ let count_expr =
+ AggregateExprBuilder::new(count_udaf(), vec![col("b",
&schema()).unwrap()])
+ .schema(schema())
+ .alias("count")
+ .build()
+ .unwrap();
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ vec![count_expr.into()], // Has aggregate function
+ vec![None], // No filter on the aggregate function
+ Arc::clone(&scan),
+ schema(),
+ )
+ .unwrap(),
+ );
+
+ // Add a filter on the grouping column 'a'
+ let predicate = col_lit_predicate("a", "x", &schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
+ as Arc<dyn ExecutionPlan>;
+
+ // Even with aggregate functions, filter on grouping column should be
pushed through
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = x
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count],
ordering_mode=Sorted
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = x
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
+ // Test that filters on non-grouping columns (like aggregate results) are
NOT pushed through
+ // Simulates: SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt >
5
+ // The filter on 'cnt' cannot be pushed down because it's an aggregate
result, not a grouping column
+
+ let batches =
+ vec![
+ record_batch!(("a", Utf8, ["x", "y"]), ("b", Utf8, ["foo",
"bar"])).unwrap(),
+ ];
+
+ let scan = TestScanBuilder::new(schema())
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+
+ // Create an aggregate: GROUP BY a with COUNT(b)
+ let group_by = PhysicalGroupBy::new_single(vec![(
+ col("a", &schema()).unwrap(),
+ "a".to_string(),
+ )]);
+
+ // Add COUNT aggregate
+ let count_expr =
+ AggregateExprBuilder::new(count_udaf(), vec![col("b",
&schema()).unwrap()])
+ .schema(schema())
+ .alias("count")
+ .build()
+ .unwrap();
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ vec![count_expr.into()],
+ vec![None],
+ Arc::clone(&scan),
+ schema(),
+ )
+ .unwrap(),
+ );
+
+ // Add a filter on the aggregate output column
+ // This simulates filtering on COUNT result, which should NOT be pushed
through
+ let agg_schema = aggregate.schema();
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new_with_schema("count[count]",
&agg_schema).unwrap()),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
+ ));
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
+ as Arc<dyn ExecutionPlan>;
+
+ // The filter should NOT be pushed through the aggregate since it
references a non-grouping column
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: count[count]@1 > 5
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: count[count]@1 > 5
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ "
+ );
+}
+
Review Comment:
Can we also add some coverage for computed grouping keys like `group by
(a+1)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]