gene-bordegaray opened a new issue, #18777:
URL: https://github.com/apache/datafusion/issues/18777
### Is your feature request related to a problem or challenge?
When data files are non-overlapping on the `GROUP BY` key (e.g. file 1
contains id='A', file 2 contains id='B'), DataFusion should use
`AggregateMode::SinglePartitioned` instead of the current behavior: `Parital ->
Merge -> Final` pattern.
This would create parallel aggregation without unnecessary merging.
## Current Behavior / Minimal Reproducer
To reproduce this behavior run this query:
```sql
-- Config to prevent repartitioning
SET datafusion.execution.target_partitions = 3;
SET datafusion.optimizer.repartition_aggregations=false;
SET datafusion.optimizer.enable_round_robin_repartition = false;
SET datafusion.explain.format = 'indent';
-- Create 3 files, each with DISTINCT f_dkey values (non-overlapping)
COPY (
SELECT 'A' as f_dkey, TIMESTAMP '2024-01-01 00:00:01' as ts, 10.5 as
value
UNION ALL SELECT 'A', TIMESTAMP '2024-01-01 00:00:05', 15.2
UNION ALL SELECT 'A', TIMESTAMP '2024-01-01 00:00:35', 20.1
) TO 'fact_A.parquet';
COPY (
SELECT 'B' as f_dkey, TIMESTAMP '2024-01-01 00:00:02' as ts, 100.5 as
value
UNION ALL SELECT 'B', TIMESTAMP '2024-01-01 00:00:08', 150.2
UNION ALL SELECT 'B', TIMESTAMP '2024-01-01 00:00:38', 200.1
) TO 'fact_B.parquet';
COPY (
SELECT 'C' as f_dkey, TIMESTAMP '2024-01-01 00:00:03' as ts, 1000.5 as
value
UNION ALL SELECT 'C', TIMESTAMP '2024-01-01 00:00:12', 1500.2
UNION ALL SELECT 'C', TIMESTAMP '2024-01-01 00:00:42', 2000.1
) TO 'fact_C.parquet';
-- Create table reading all 3 files
CREATE EXTERNAL TABLE facts
STORED AS PARQUET
LOCATION 'fact_*.parquet';
-- Query that should use SinglePartitioned mode
EXPLAIN
SELECT
f_dkey,
date_bin(INTERVAL '30 seconds', ts) AS time_bin,
MAX(value) as max_value
FROM facts
GROUP BY f_dkey, time_bin;
```
This will produce the plan:
```text
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: facts.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts) AS time_bin, max(facts.value) AS
max_value
|
| | Aggregate: groupBy=[[facts.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"), facts.ts)]], aggr=[[max(facts.value)]]
|
| | TableScan: facts projection=[f_dkey, ts, value]
|
| physical_plan | ProjectionExec: expr=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts)@1 as time_bin, max(facts.value)@2 as
max_value]
|
| | AggregateExec: mode=Final, gby=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts)@1 as
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts)], aggr=[max(facts.value)]
|
| | CoalescePartitionsExec
|
| | AggregateExec: mode=Partial, gby=[f_dkey@0 as
f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds:
30000000000 }, ts@1) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano {
months: 0, days: 0, nanoseconds: 30000000000 }"),facts.ts)],
aggr=[max(facts.value)] |
| | DataSourceExec: file_groups={3 groups:
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_A.parquet],
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_B.parquet],
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_C.parquet]]},
projection=[f_dkey, ts, value], file_type=parquet |
| |
|
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
The `Aggregate Partial -> CoalescePartitionsExec -> Aggregate Final` is
inefficient because the Aggregate Final is forced to run on a single partition.
Because we are grouping by `[f_dkey, datebin(ts)]` and no group key spans
multiple files, each file/partition can independently compute complete
aggregate results.
### Describe the solution you'd like
The scenario described above should produce a plan like this:
```text
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: facts.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts) AS time_bin, max(facts.value) AS
max_value
|
| | Aggregate: groupBy=[[facts.f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"), facts.ts)]], aggr=[[max(facts.value)]]
|
| | TableScan: facts projection=[f_dkey, ts, value]
|
| physical_plan | ProjectionExec: expr=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts)@1 as time_bin, max(facts.value)@2 as
max_value]
|
| | AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as
f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days:
0, nanoseconds: 30000000000 }"),facts.ts)@1 as
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),facts.ts)], aggr=[max(facts.value)] |
| | DataSourceExec: file_groups={3 groups:
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_A.parquet],
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_B.parquet],
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_C.parquet]]},
projection=[f_dkey, ts, value], file_type=parquet |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
Eliminating the merge overhead from the `CoalescePartitionsExec` and
increase parallelism by computing aggregates independently.
## Implementation (in combine_partial_final_agg.rs)
- Detect Pattern of `Aggregate Final -> Coalesce or SPM -> Aggregate Partial`
- Extract the `GROUP BY` column expressions from aggregate
- For each input partitions check the min max ranges to see if they are
overlapping
- If they aren't parallelize -> replace the patter with `Aggregate
SinglePartitioned`
### Describe alternatives you've considered
1. User Annotation (Explicit) -> allow users to mark tables as partitioned.
This is simple but depends on the user to input leading to error-prone if
assumption is wrong, might be under utilized
2. New Partitioning type -> create a new NonOverlapping Partition variant
and propagate through the plan. This would require large refactor for smething
that can be done with existing infra.
### Additional context
The `SinglePartitioned` mode already exists and is designed for this exact
use but isn't being applied when it could be. It's definition can be found
[here](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/mod.rs#L133)
Here is the provided documentation on its use case:
```text
/// *Single* layer of Aggregation, input is *Partitioned*
///
/// Applies the entire logical aggregation operation in a single
operator,
/// as opposed to Partial / Final modes which apply the logical
aggregation
/// using two operators.
///
/// This mode requires that the input has more than one partition, and is
/// partitioned by group key (like FinalPartitioned).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]