gene-bordegaray opened a new issue, #18595:
URL: https://github.com/apache/datafusion/issues/18595
### Describe the bug
Small datasets undergo unnecessary repartitioning.
For example, a dataset of 5 rows will be split across multiple certain
queries, leading to overhead that exceeds the parallelism benefit.
### To Reproduce
An example can be found in the `aggregate_repartition.slt` file. This test
creates two tables of 5 rows each in Parquet and CSV format, and an aggregation
on one of their columns. Despite the small size, the query plan repartitions
the data:
**Setup the data:**
```
COPY (
SELECT * FROM (VALUES
('prod', 100, 'A'),
('dev', 200, 'B'),
('test', 150, 'A'),
('prod', 300, 'C'),
('dev', 250, 'B')
) AS t(env, value, category)
)
TO 'test_files/scratch/aggregate_repartition/dim.csv'
STORED AS CSV
OPTIONS ('format.has_header' 'true');
COPY (
SELECT * FROM (VALUES
('prod', 100, 'A'),
('dev', 200, 'B'),
('test', 150, 'A'),
('prod', 300, 'C'),
('dev', 250, 'B')
) AS t(env, value, category)
)
TO 'test_files/scratch/aggregate_repartition/dim.parquet'
STORED AS PARQUET;
CREATE EXTERNAL TABLE dim_csv
STORED AS CSV
LOCATION 'test_files/scratch/aggregate_repartition/dim.csv'
OPTIONS ('format.has_header' 'true');
CREATE EXTERNAL TABLE dim_parquet
STORED AS PARQUET
LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet';
```
**CSV Physical Plan:**
```
EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env],
aggr=[count(Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[env@0 as env],
aggr=[count(Int64(1))]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]},
projection=[env], file_type=csv, has_header=true
```
**Parquet Physical Plan:**
```
EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env],
aggr=[count(Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[env@0 as env],
aggr=[count(Int64(1))]
06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
projection=[env], file_type=parquet
```
### Expected behavior
The data sets should not be repartitioned when this small in size, thus
should yield the plans:
**CSV Physical Plan:**
```
EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env],
aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]},
projection=[env], file_type=csv, has_header=true
```
**Parquet Physical Plan:**
```
EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env],
aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
projection=[env], file_type=parquet
```
### Additional context
The problem roots from
`enforce_distribution.rs:get_repartition_requirement_status`.
Here round-robin partitioning is added based on statistics on the file:
- we are optimistic with **CSV** files and return `true` since CSV return no
statistics. We could look at adding a better way to estimate if a CSV file
needs repartitioning at the file level
- We have exact statistics on **Parquet** files, thus we do not round robin
repartition at the file level, but we still hash repartition later on in the
query. We can look at how hash repartitioning is handles based on file
statistics and improve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]