gene-bordegaray opened a new issue, #18595:
URL: https://github.com/apache/datafusion/issues/18595

   ### Describe the bug
   
   Small datasets undergo unnecessary repartitioning.
   
   For example, a dataset of 5 rows will be split across multiple certain 
queries, leading to overhead that exceeds the parallelism benefit.
   
   ### To Reproduce
   
   An example can be found in the `aggregate_repartition.slt` file. This test 
creates two tables of 5 rows each in Parquet and CSV format, and an aggregation 
on one of their columns. Despite the small size, the query plan repartitions 
the data:
   
   **Setup the data:**
   ```
   COPY (
     SELECT * FROM (VALUES 
       ('prod', 100, 'A'),
       ('dev', 200, 'B'),
       ('test', 150, 'A'),
       ('prod', 300, 'C'),
       ('dev', 250, 'B')
     ) AS t(env, value, category)
   )
   TO 'test_files/scratch/aggregate_repartition/dim.csv'
   STORED AS CSV
   OPTIONS ('format.has_header' 'true');
   
   COPY (
     SELECT * FROM (VALUES 
       ('prod', 100, 'A'),
       ('dev', 200, 'B'),
       ('test', 150, 'A'),
       ('prod', 300, 'C'),
       ('dev', 250, 'B')
     ) AS t(env, value, category)
   )
   TO 'test_files/scratch/aggregate_repartition/dim.parquet'
   STORED AS PARQUET;
   
   CREATE EXTERNAL TABLE dim_csv
   STORED AS CSV 
   LOCATION 'test_files/scratch/aggregate_repartition/dim.csv'
   OPTIONS ('format.has_header' 'true');
   
   CREATE EXTERNAL TABLE dim_parquet
   STORED AS PARQUET 
   LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet';
   ```
   
   **CSV Physical Plan:**
   ```
   EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
   
   physical_plan
   01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
   02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   03)----CoalesceBatchesExec: target_batch_size=8192
   04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
   05)--------AggregateExec: mode=Partial, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
   07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]},
 projection=[env], file_type=csv, has_header=true
   ```
   
   **Parquet Physical Plan:**
   ```
   EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
   
   physical_plan
   01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
   02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   03)----CoalesceBatchesExec: target_batch_size=8192
   04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
   05)--------AggregateExec: mode=Partial, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
 projection=[env], file_type=parquet
   ```
   
   ### Expected behavior
   
   The data sets should not be repartitioned when this small in size, thus 
should yield the plans:
   
   **CSV Physical Plan:**
   ```
   EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
   
   physical_plan
   01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
   02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]},
 projection=[env], file_type=csv, has_header=true
   ```
   
   **Parquet Physical Plan:**
   ```
   EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
   
   physical_plan
   01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
   02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
   03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
 projection=[env], file_type=parquet
   ```
   
   ### Additional context
   
   The problem roots from 
`enforce_distribution.rs:get_repartition_requirement_status`. 
   Here round-robin partitioning is added based on statistics on the file:
   - we are optimistic with **CSV** files and return `true` since CSV return no 
statistics. We could look at adding a better way to estimate if a CSV file 
needs repartitioning at the file level
   - We have exact statistics on **Parquet** files, thus we do not round robin 
repartition at the file level, but we still hash repartition later on in the 
query. We can look at how hash repartitioning is handles based on file 
statistics and improve this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to