This is an automated email from the ASF dual-hosted git repository.
linwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 244d8a1bc2 Move `CombinePartialFinalAggregate` rule into
physical-optimizer crate (#12167)
244d8a1bc2 is described below
commit 244d8a1bc2213163d110b40cb959649511086df6
Author: 张林伟 <[email protected]>
AuthorDate: Mon Sep 2 20:48:49 2024 +0800
Move `CombinePartialFinalAggregate` rule into physical-optimizer crate
(#12167)
---
.../combine_partial_final_agg.rs | 435 ---------------------
datafusion/core/src/physical_optimizer/mod.rs | 1 -
.../combine_partial_final_agg.rs | 290 ++++++++++++++
datafusion/core/tests/physical_optimizer/mod.rs | 1 +
.../src/combine_partial_final_agg.rs | 164 ++++++++
datafusion/physical-optimizer/src/lib.rs | 1 +
6 files changed, 456 insertions(+), 436 deletions(-)
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
deleted file mode 100644
index 1a12fc7de8..0000000000
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ /dev/null
@@ -1,435 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial
and Final AggregateExecs
-//! and try to combine them if necessary
-
-use std::sync::Arc;
-
-use crate::error::Result;
-use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
-use crate::physical_plan::ExecutionPlan;
-
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
-use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr};
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
-
-/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial
and Final AggregateExecs
-/// into a Single AggregateExec if their grouping exprs and aggregate exprs
equal.
-///
-/// This rule should be applied after the EnforceDistribution and
EnforceSorting rules
-///
-#[derive(Default)]
-pub struct CombinePartialFinalAggregate {}
-
-impl CombinePartialFinalAggregate {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
- fn optimize(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_down(|plan| {
- // Check if the plan is AggregateExec
- let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>()
else {
- return Ok(Transformed::no(plan));
- };
-
- if !matches!(
- agg_exec.mode(),
- AggregateMode::Final | AggregateMode::FinalPartitioned
- ) {
- return Ok(Transformed::no(plan));
- }
-
- // Check if the input is AggregateExec
- let Some(input_agg_exec) =
- agg_exec.input().as_any().downcast_ref::<AggregateExec>()
- else {
- return Ok(Transformed::no(plan));
- };
-
- let transformed = if matches!(input_agg_exec.mode(),
AggregateMode::Partial)
- && can_combine(
- (
- agg_exec.group_expr(),
- agg_exec.aggr_expr(),
- agg_exec.filter_expr(),
- ),
- (
- input_agg_exec.group_expr(),
- input_agg_exec.aggr_expr(),
- input_agg_exec.filter_expr(),
- ),
- ) {
- let mode = if agg_exec.mode() == &AggregateMode::Final {
- AggregateMode::Single
- } else {
- AggregateMode::SinglePartitioned
- };
- AggregateExec::try_new(
- mode,
- input_agg_exec.group_expr().clone(),
- input_agg_exec.aggr_expr().to_vec(),
- input_agg_exec.filter_expr().to_vec(),
- input_agg_exec.input().clone(),
- input_agg_exec.input_schema(),
- )
- .map(|combined_agg| combined_agg.with_limit(agg_exec.limit()))
- .ok()
- .map(Arc::new)
- } else {
- None
- };
- Ok(if let Some(transformed) = transformed {
- Transformed::yes(transformed)
- } else {
- Transformed::no(plan)
- })
- })
- .data()
- }
-
- fn name(&self) -> &str {
- "CombinePartialFinalAggregate"
- }
-
- fn schema_check(&self) -> bool {
- true
- }
-}
-
-type GroupExprsRef<'a> = (
- &'a PhysicalGroupBy,
- &'a [Arc<AggregateFunctionExpr>],
- &'a [Option<Arc<dyn PhysicalExpr>>],
-);
-
-fn can_combine(final_agg: GroupExprsRef, partial_agg: GroupExprsRef) -> bool {
- let (final_group_by, final_aggr_expr, final_filter_expr) = final_agg;
- let (input_group_by, input_aggr_expr, input_filter_expr) = partial_agg;
-
- // Compare output expressions of the partial, and input expressions of the
final operator.
- physical_exprs_equal(
- &input_group_by.output_exprs(),
- &final_group_by.input_exprs(),
- ) && input_group_by.groups() == final_group_by.groups()
- && input_group_by.null_expr().len() == final_group_by.null_expr().len()
- && input_group_by
- .null_expr()
- .iter()
- .zip(final_group_by.null_expr().iter())
- .all(|((lhs_expr, lhs_str), (rhs_expr, rhs_str))| {
- lhs_expr.eq(rhs_expr) && lhs_str == rhs_str
- })
- && final_aggr_expr.len() == input_aggr_expr.len()
- && final_aggr_expr
- .iter()
- .zip(input_aggr_expr.iter())
- .all(|(final_expr, partial_expr)| final_expr.eq(partial_expr))
- && final_filter_expr.len() == input_filter_expr.len()
- && final_filter_expr.iter().zip(input_filter_expr.iter()).all(
- |(final_expr, partial_expr)| match (final_expr, partial_expr) {
- (Some(l), Some(r)) => l.eq(r),
- (None, None) => true,
- _ => false,
- },
- )
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::object_store::ObjectStoreUrl;
- use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
- use crate::physical_plan::expressions::lit;
- use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::{displayable, Partitioning};
-
- use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_functions_aggregate::count::count_udaf;
- use datafusion_functions_aggregate::sum::sum_udaf;
- use datafusion_physical_expr::aggregate::AggregateExprBuilder;
- use datafusion_physical_expr::expressions::col;
-
- /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan
against the expected
- macro_rules! assert_optimized {
- ($EXPECTED_LINES: expr, $PLAN: expr) => {
- let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
-
- // run optimizer
- let optimizer = CombinePartialFinalAggregate {};
- let config = ConfigOptions::new();
- let optimized = optimizer.optimize($PLAN, &config)?;
- // Now format correctly
- let plan =
displayable(optimized.as_ref()).indent(true).to_string();
- let actual_lines = trim_plan_display(&plan);
-
- assert_eq!(
- &expected_lines, &actual_lines,
- "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
- expected_lines, actual_lines
- );
- };
- }
-
- fn trim_plan_display(plan: &str) -> Vec<&str> {
- plan.split('\n')
- .map(|s| s.trim())
- .filter(|s| !s.is_empty())
- .collect()
- }
-
- fn schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int64, true),
- Field::new("b", DataType::Int64, true),
- Field::new("c", DataType::Int64, true),
- ]))
- }
-
- fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
- ParquetExec::builder(
- FileScanConfig::new(
- ObjectStoreUrl::parse("test:///").unwrap(),
- schema.clone(),
- )
- .with_file(PartitionedFile::new("x".to_string(), 100)),
- )
- .build_arc()
- }
-
- fn partial_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Partial,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn final_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Final,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
- Arc::new(
- RepartitionExec::try_new(input,
Partitioning::RoundRobinBatch(10)).unwrap(),
- )
- }
-
- // Return appropriate expr depending if COUNT is for col or table (*)
- fn count_expr(
- expr: Arc<dyn PhysicalExpr>,
- name: &str,
- schema: &Schema,
- ) -> Arc<AggregateFunctionExpr> {
- AggregateExprBuilder::new(count_udaf(), vec![expr])
- .schema(Arc::new(schema.clone()))
- .alias(name)
- .build()
- .unwrap()
- }
-
- #[test]
- fn aggregations_not_combined() -> Result<()> {
- let schema = schema();
-
- let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
-
- let plan = final_aggregate_exec(
- repartition_exec(partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- )),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
- assert_optimized!(expected, plan);
-
- let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
- let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr1,
- ),
- PhysicalGroupBy::default(),
- aggr_expr2,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
-
- Ok(())
- }
-
- #[test]
- fn aggregations_combined() -> Result<()> {
- let schema = schema();
- let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- ),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should combine the Partial/Final AggregateExecs to the Single
AggregateExec
- let expected = &[
- "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
- Ok(())
- }
-
- #[test]
- fn aggregations_with_group_combined() -> Result<()> {
- let schema = schema();
- let aggr_expr =
- vec![
- AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
- .schema(Arc::clone(&schema))
- .alias("Sum(b)")
- .build()
- .unwrap(),
- ];
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("c", &schema)?, "c".to_string())];
-
- let partial_group_by = PhysicalGroupBy::new_single(groups);
- let partial_agg = partial_aggregate_exec(
- parquet_exec(&schema),
- partial_group_by,
- aggr_expr.clone(),
- );
-
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("c", &partial_agg.schema())?, "c".to_string())];
- let final_group_by = PhysicalGroupBy::new_single(groups);
-
- let plan = final_aggregate_exec(partial_agg, final_group_by,
aggr_expr);
- // should combine the Partial/Final AggregateExecs to the Single
AggregateExec
- let expected = &[
- "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
- Ok(())
- }
-
- #[test]
- fn aggregations_with_limit_combined() -> Result<()> {
- let schema = schema();
- let aggr_expr = vec![];
-
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("c", &schema)?, "c".to_string())];
-
- let partial_group_by = PhysicalGroupBy::new_single(groups);
- let partial_agg = partial_aggregate_exec(
- parquet_exec(&schema),
- partial_group_by,
- aggr_expr.clone(),
- );
-
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("c", &partial_agg.schema())?, "c".to_string())];
- let final_group_by = PhysicalGroupBy::new_single(groups);
-
- let schema = partial_agg.schema();
- let final_agg = Arc::new(
- AggregateExec::try_new(
- AggregateMode::Final,
- final_group_by,
- aggr_expr,
- vec![],
- partial_agg,
- schema,
- )
- .unwrap()
- .with_limit(Some(5)),
- );
- let plan: Arc<dyn ExecutionPlan> = final_agg;
- // should combine the Partial/Final AggregateExecs to a Single
AggregateExec
- // with the final limit preserved
- let expected = &[
- "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[], lim=[5]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
- Ok(())
- }
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index c32c77043f..46d86ead18 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -22,7 +22,6 @@
//!
//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
pub mod coalesce_batches;
-pub mod combine_partial_final_agg;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
diff --git
a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
new file mode 100644
index 0000000000..5152afa6c0
--- /dev/null
+++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use
datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
+use datafusion_common::config::ConfigOptions;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_functions_aggregate::sum::sum_udaf;
+use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
+use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_physical_expr::Partitioning;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::aggregates::{
+ AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion_physical_plan::displayable;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan
against the expected
+macro_rules! assert_optimized {
+ ($EXPECTED_LINES: expr, $PLAN: expr) => {
+ let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
+
+ // run optimizer
+ let optimizer = CombinePartialFinalAggregate {};
+ let config = ConfigOptions::new();
+ let optimized = optimizer.optimize($PLAN, &config)?;
+ // Now format correctly
+ let plan = displayable(optimized.as_ref()).indent(true).to_string();
+ let actual_lines = trim_plan_display(&plan);
+
+ assert_eq!(
+ &expected_lines, &actual_lines,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected_lines, actual_lines
+ );
+ };
+}
+
+fn trim_plan_display(plan: &str) -> Vec<&str> {
+ plan.split('\n')
+ .map(|s| s.trim())
+ .filter(|s| !s.is_empty())
+ .collect()
+}
+
+fn schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new("b", DataType::Int64, true),
+ Field::new("c", DataType::Int64, true),
+ ]))
+}
+
+fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
+ ParquetExec::builder(
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
+ .with_file(PartitionedFile::new("x".to_string(), 100)),
+ )
+ .build_arc()
+}
+
+fn partial_aggregate_exec(
+ input: Arc<dyn ExecutionPlan>,
+ group_by: PhysicalGroupBy,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
+) -> Arc<dyn ExecutionPlan> {
+ let schema = input.schema();
+ let n_aggr = aggr_expr.len();
+ Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ aggr_expr,
+ vec![None; n_aggr],
+ input,
+ schema,
+ )
+ .unwrap(),
+ )
+}
+
+fn final_aggregate_exec(
+ input: Arc<dyn ExecutionPlan>,
+ group_by: PhysicalGroupBy,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
+) -> Arc<dyn ExecutionPlan> {
+ let schema = input.schema();
+ let n_aggr = aggr_expr.len();
+ Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggr_expr,
+ vec![None; n_aggr],
+ input,
+ schema,
+ )
+ .unwrap(),
+ )
+}
+
+fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+ Arc::new(RepartitionExec::try_new(input,
Partitioning::RoundRobinBatch(10)).unwrap())
+}
+
+// Return appropriate expr depending if COUNT is for col or table (*)
+fn count_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ name: &str,
+ schema: &Schema,
+) -> Arc<AggregateFunctionExpr> {
+ AggregateExprBuilder::new(count_udaf(), vec![expr])
+ .schema(Arc::new(schema.clone()))
+ .alias(name)
+ .build()
+ .unwrap()
+}
+
+#[test]
+fn aggregations_not_combined() -> datafusion_common::Result<()> {
+ let schema = schema();
+
+ let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
+
+ let plan = final_aggregate_exec(
+ repartition_exec(partial_aggregate_exec(
+ parquet_exec(&schema),
+ PhysicalGroupBy::default(),
+ aggr_expr.clone(),
+ )),
+ PhysicalGroupBy::default(),
+ aggr_expr,
+ );
+ // should not combine the Partial/Final AggregateExecs
+ let expected = &[
+ "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
+ ];
+ assert_optimized!(expected, plan);
+
+ let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
+ let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)];
+
+ let plan = final_aggregate_exec(
+ partial_aggregate_exec(
+ parquet_exec(&schema),
+ PhysicalGroupBy::default(),
+ aggr_expr1,
+ ),
+ PhysicalGroupBy::default(),
+ aggr_expr2,
+ );
+ // should not combine the Partial/Final AggregateExecs
+ let expected = &[
+ "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
+ "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
+ ];
+
+ assert_optimized!(expected, plan);
+
+ Ok(())
+}
+
+#[test]
+fn aggregations_combined() -> datafusion_common::Result<()> {
+ let schema = schema();
+ let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
+
+ let plan = final_aggregate_exec(
+ partial_aggregate_exec(
+ parquet_exec(&schema),
+ PhysicalGroupBy::default(),
+ aggr_expr.clone(),
+ ),
+ PhysicalGroupBy::default(),
+ aggr_expr,
+ );
+ // should combine the Partial/Final AggregateExecs to the Single
AggregateExec
+ let expected = &[
+ "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+}
+
+#[test]
+fn aggregations_with_group_combined() -> datafusion_common::Result<()> {
+ let schema = schema();
+ let aggr_expr = vec![
+ AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("Sum(b)")
+ .build()
+ .unwrap(),
+ ];
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("c", &schema)?, "c".to_string())];
+
+ let partial_group_by = PhysicalGroupBy::new_single(groups);
+ let partial_agg = partial_aggregate_exec(
+ parquet_exec(&schema),
+ partial_group_by,
+ aggr_expr.clone(),
+ );
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("c", &partial_agg.schema())?, "c".to_string())];
+ let final_group_by = PhysicalGroupBy::new_single(groups);
+
+ let plan = final_aggregate_exec(partial_agg, final_group_by, aggr_expr);
+ // should combine the Partial/Final AggregateExecs to the Single
AggregateExec
+ let expected = &[
+ "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+}
+
+#[test]
+fn aggregations_with_limit_combined() -> datafusion_common::Result<()> {
+ let schema = schema();
+ let aggr_expr = vec![];
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("c", &schema)?, "c".to_string())];
+
+ let partial_group_by = PhysicalGroupBy::new_single(groups);
+ let partial_agg = partial_aggregate_exec(
+ parquet_exec(&schema),
+ partial_group_by,
+ aggr_expr.clone(),
+ );
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("c", &partial_agg.schema())?, "c".to_string())];
+ let final_group_by = PhysicalGroupBy::new_single(groups);
+
+ let schema = partial_agg.schema();
+ let final_agg = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ final_group_by,
+ aggr_expr,
+ vec![],
+ partial_agg,
+ schema,
+ )
+ .unwrap()
+ .with_limit(Some(5)),
+ );
+ let plan: Arc<dyn ExecutionPlan> = final_agg;
+ // should combine the Partial/Final AggregateExecs to a Single
AggregateExec
+ // with the final limit preserved
+ let expected = &[
+ "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[], lim=[5]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+}
diff --git a/datafusion/core/tests/physical_optimizer/mod.rs
b/datafusion/core/tests/physical_optimizer/mod.rs
index 149103cf34..4ec981bf2a 100644
--- a/datafusion/core/tests/physical_optimizer/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/mod.rs
@@ -16,6 +16,7 @@
// under the License.
mod aggregate_statistics;
+mod combine_partial_final_agg;
mod limit_pushdown;
mod limited_distinct_aggregation;
mod test_util;
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
new file mode 100644
index 0000000000..12ff13f8f6
--- /dev/null
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial
and Final AggregateExecs
+//! and try to combine them if necessary
+
+use std::sync::Arc;
+
+use datafusion_common::error::Result;
+use datafusion_physical_plan::aggregates::{
+ AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion_physical_plan::ExecutionPlan;
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
+use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial
and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs
equal.
+///
+/// This rule should be applied after the EnforceDistribution and
EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ plan.transform_down(|plan| {
+ // Check if the plan is AggregateExec
+ let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>()
else {
+ return Ok(Transformed::no(plan));
+ };
+
+ if !matches!(
+ agg_exec.mode(),
+ AggregateMode::Final | AggregateMode::FinalPartitioned
+ ) {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Check if the input is AggregateExec
+ let Some(input_agg_exec) =
+ agg_exec.input().as_any().downcast_ref::<AggregateExec>()
+ else {
+ return Ok(Transformed::no(plan));
+ };
+
+ let transformed = if matches!(input_agg_exec.mode(),
AggregateMode::Partial)
+ && can_combine(
+ (
+ agg_exec.group_expr(),
+ agg_exec.aggr_expr(),
+ agg_exec.filter_expr(),
+ ),
+ (
+ input_agg_exec.group_expr(),
+ input_agg_exec.aggr_expr(),
+ input_agg_exec.filter_expr(),
+ ),
+ ) {
+ let mode = if agg_exec.mode() == &AggregateMode::Final {
+ AggregateMode::Single
+ } else {
+ AggregateMode::SinglePartitioned
+ };
+ AggregateExec::try_new(
+ mode,
+ input_agg_exec.group_expr().clone(),
+ input_agg_exec.aggr_expr().to_vec(),
+ input_agg_exec.filter_expr().to_vec(),
+ Arc::clone(input_agg_exec.input()),
+ input_agg_exec.input_schema(),
+ )
+ .map(|combined_agg| combined_agg.with_limit(agg_exec.limit()))
+ .ok()
+ .map(Arc::new)
+ } else {
+ None
+ };
+ Ok(if let Some(transformed) = transformed {
+ Transformed::yes(transformed)
+ } else {
+ Transformed::no(plan)
+ })
+ })
+ .data()
+ }
+
+ fn name(&self) -> &str {
+ "CombinePartialFinalAggregate"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+type GroupExprsRef<'a> = (
+ &'a PhysicalGroupBy,
+ &'a [Arc<AggregateFunctionExpr>],
+ &'a [Option<Arc<dyn PhysicalExpr>>],
+);
+
+fn can_combine(final_agg: GroupExprsRef, partial_agg: GroupExprsRef) -> bool {
+ let (final_group_by, final_aggr_expr, final_filter_expr) = final_agg;
+ let (input_group_by, input_aggr_expr, input_filter_expr) = partial_agg;
+
+ // Compare output expressions of the partial, and input expressions of the
final operator.
+ physical_exprs_equal(
+ &input_group_by.output_exprs(),
+ &final_group_by.input_exprs(),
+ ) && input_group_by.groups() == final_group_by.groups()
+ && input_group_by.null_expr().len() == final_group_by.null_expr().len()
+ && input_group_by
+ .null_expr()
+ .iter()
+ .zip(final_group_by.null_expr().iter())
+ .all(|((lhs_expr, lhs_str), (rhs_expr, rhs_str))| {
+ lhs_expr.eq(rhs_expr) && lhs_str == rhs_str
+ })
+ && final_aggr_expr.len() == input_aggr_expr.len()
+ && final_aggr_expr
+ .iter()
+ .zip(input_aggr_expr.iter())
+ .all(|(final_expr, partial_expr)| final_expr.eq(partial_expr))
+ && final_filter_expr.len() == input_filter_expr.len()
+ && final_filter_expr.iter().zip(input_filter_expr.iter()).all(
+ |(final_expr, partial_expr)| match (final_expr, partial_expr) {
+ (Some(l), Some(r)) => l.eq(r),
+ (None, None) => true,
+ _ => false,
+ },
+ )
+}
+
+// See tests in datafusion/core/tests/physical_optimizer
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index caebdcc927..41dfdb84a9 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -18,6 +18,7 @@
#![deny(clippy::clone_on_ref_ptr)]
pub mod aggregate_statistics;
+pub mod combine_partial_final_agg;
pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]