Dandandan commented on code in PR #18343: URL: https://github.com/apache/datafusion/pull/18343#discussion_r2473471407
########## datafusion/sqllogictest/test_files/aggregate_repartition.slt: ########## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Reproducer for https://github.com/apache/datafusion/issues/18341 +# Tests for aggregate repartition behavior +# Comparing CSV vs Parquet execution plans for GROUP BY queries + +# Create CSV version of the dimension data +query I +COPY ( + SELECT * FROM (VALUES + ('prod', 100, 'A'), + ('dev', 200, 'B'), + ('test', 150, 'A'), + ('prod', 300, 'C'), + ('dev', 250, 'B') + ) AS t(env, value, category) +) +TO 'test_files/scratch/aggregate_repartition/dim.csv' +STORED AS CSV +OPTIONS ('format.has_header' 'true'); +---- +5 + +# Create Parquet version of the dimension data +query I +COPY ( + SELECT * FROM (VALUES + ('prod', 100, 'A'), + ('dev', 200, 'B'), + ('test', 150, 'A'), + ('prod', 300, 'C'), + ('dev', 250, 'B') + ) AS t(env, value, category) +) +TO 'test_files/scratch/aggregate_repartition/dim.parquet' +STORED AS PARQUET; +---- +5 + +# Create external table for CSV +statement ok +CREATE EXTERNAL TABLE dim_csv +STORED AS CSV +LOCATION 'test_files/scratch/aggregate_repartition/dim.csv' +OPTIONS ('format.has_header' 'true'); + +# Create external table for Parquet +statement ok +CREATE EXTERNAL TABLE dim_parquet +STORED AS PARQUET +LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet'; + +# Test 1: EXPLAIN query for CSV table with GROUP BY +# This plans looks reasonable +query TT +EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env; +---- +logical_plan +01)Projection: dim_csv.env, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[dim_csv.env]], aggr=[[count(Int64(1))]] +03)----TableScan: dim_csv projection=[env] +physical_plan +01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4 +05)--------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true + +# Test 2: EXPLAIN query for Parquet table with GROUP BY +# This plan differs from the one above and includes two consecutive repartitions — one round-robin and one hash — +# which seems unnecessary. We may want to align it with the previous plan (push the round robin down or remove the round robin), or, if the input file is small, +# avoid repartitioning altogether. A single partition should suffice for a single-step aggregate as the plan after this. + +query TT +EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env; +---- +logical_plan +01)Projection: dim_parquet.env, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]] +03)----TableScan: dim_parquet projection=[env] +physical_plan +01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 Review Comment: ``` 04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ``` This does not seem unreasonable in general? Having only `RepartitionExec: partitioning=Hash([env@0], 4)` would mean hash-repartition itself is not running in parallel (as only having a single input stream). Repartitioning helps to increase the parallelism of `RepartitionExec` itself (for larger inputs). I agree either the display of this or improving the implementation (e.g. moving parallelism to `RepartitionExec: partitioning=Hash([env@0], 4)`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
