adriangb commented on code in PR #18404:
URL: https://github.com/apache/datafusion/pull/18404#discussion_r2483027542
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1025,6 +1031,86 @@ impl ExecutionPlan for AggregateExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
+
+ fn gather_filters_for_pushdown(
+ &self,
+ _phase: FilterPushdownPhase,
+ parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+ _config: &ConfigOptions,
+ ) -> Result<FilterDescription> {
+ // It's safe to push down filters through aggregates when filters only
reference
+ // grouping columns, because such filters determine which groups to
compute, not
+ // *how* to compute them. Each group's aggregate values (SUM, COUNT,
etc.) are
+ // calculated from the same input rows regardless of whether we filter
before or
+ // after grouping - filtering before just eliminates entire groups
early.
+ // This optimization is NOT safe for filters on aggregated columns
(like filtering on
+ // the result of SUM or COUNT), as those require computing all groups
first.
+
+ let grouping_columns: HashSet<_> = self
+ .group_by
+ .expr()
+ .iter()
+ .flat_map(|(expr, _)| collect_columns(expr))
+ .collect();
+
+ // Analyze each filter separately to determine if it can be pushed down
+ let mut safe_filters = Vec::new();
+ let mut unsafe_filters = Vec::new();
Review Comment:
If we see this pattern again somewhere else we could add a helper to
`FilterDescription` or related structures.
##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -1892,3 +1891,277 @@ fn col_lit_predicate(
Arc::new(Literal::new(scalar_value)),
))
}
+
+#[tokio::test]
+async fn test_aggregate_filter_pushdown() {
+ // Test that filters can pass through AggregateExec even with aggregate
functions
+ // when the filter references grouping columns
+ // Simulates: SELECT a, COUNT(b) FROM table WHERE a = 'x' GROUP BY a
+
+ let batches =
+ vec![
+ record_batch!(("a", Utf8, ["x", "y"]), ("b", Utf8, ["foo",
"bar"])).unwrap(),
+ ];
+
+ let scan = TestScanBuilder::new(schema())
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+
+ // Create an aggregate: GROUP BY a with COUNT(b)
+ let group_by = PhysicalGroupBy::new_single(vec![(
+ col("a", &schema()).unwrap(),
+ "a".to_string(),
+ )]);
+
+ // Add COUNT aggregate
+ let count_expr =
+ AggregateExprBuilder::new(count_udaf(), vec![col("b",
&schema()).unwrap()])
+ .schema(schema())
+ .alias("count")
+ .build()
+ .unwrap();
+
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ vec![count_expr.into()], // Has aggregate function
+ vec![None], // No filter on the aggregate function
+ Arc::clone(&scan),
+ schema(),
+ )
+ .unwrap(),
+ );
+
+ // Add a filter on the grouping column 'a'
+ let predicate = col_lit_predicate("a", "x", &schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
+ as Arc<dyn ExecutionPlan>;
+
+ // Even with aggregate functions, filter on grouping column should be
pushed through
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = x
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count],
ordering_mode=Sorted
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = x
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
+ // Test that filters on non-grouping columns (like aggregate results) are
NOT pushed through
+ // Simulates: SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt >
5
Review Comment:
Can we get something like:
```sql
SELECT a, b, count(*) as cnt FROM table GROUP BY a, b HAVING b > 5
```
The point is that the filter applies to only some of the group by columns,
and not the first one.
We should make sure the behavior is correct here (I think we can still push
down but I'm not sure without thinking about it for a bit).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]