This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f57da83aac Add reproducer for consecutive RepartitionExec (#18343)
f57da83aac is described below

commit f57da83aac35f0ea4506ccb6a4ddbd26a503c1c1
Author: Nga Tran <[email protected]>
AuthorDate: Thu Oct 30 15:01:39 2025 -0400

    Add reproducer for consecutive RepartitionExec (#18343)
    
    Reproducer for https://github.com/apache/datafusion/issues/18341
---
 .../test_files/aggregate_repartition.slt           | 136 +++++++++++++++++++++
 1 file changed, 136 insertions(+)

diff --git a/datafusion/sqllogictest/test_files/aggregate_repartition.slt 
b/datafusion/sqllogictest/test_files/aggregate_repartition.slt
new file mode 100644
index 0000000000..27602b61e4
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/aggregate_repartition.slt
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Reproducer for https://github.com/apache/datafusion/issues/18341
+# Tests for aggregate repartition behavior
+# Comparing CSV vs Parquet execution plans for GROUP BY queries
+
+# Create CSV version of the dimension data
+query I
+COPY (
+  SELECT * FROM (VALUES 
+    ('prod', 100, 'A'),
+    ('dev', 200, 'B'),
+    ('test', 150, 'A'),
+    ('prod', 300, 'C'),
+    ('dev', 250, 'B')
+  ) AS t(env, value, category)
+)
+TO 'test_files/scratch/aggregate_repartition/dim.csv'
+STORED AS CSV
+OPTIONS ('format.has_header' 'true');
+----
+5
+
+# Create Parquet version of the dimension data
+query I
+COPY (
+  SELECT * FROM (VALUES 
+    ('prod', 100, 'A'),
+    ('dev', 200, 'B'),
+    ('test', 150, 'A'),
+    ('prod', 300, 'C'),
+    ('dev', 250, 'B')
+  ) AS t(env, value, category)
+)
+TO 'test_files/scratch/aggregate_repartition/dim.parquet'
+STORED AS PARQUET;
+----
+5
+
+# Create external table for CSV
+statement ok
+CREATE EXTERNAL TABLE dim_csv
+STORED AS CSV 
+LOCATION 'test_files/scratch/aggregate_repartition/dim.csv'
+OPTIONS ('format.has_header' 'true');
+
+# Create external table for Parquet
+statement ok
+CREATE EXTERNAL TABLE dim_parquet
+STORED AS PARQUET 
+LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet';
+
+# Test 1: EXPLAIN query for CSV table with GROUP BY
+# This plans looks reasonable
+query TT
+EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env;
+----
+logical_plan
+01)Projection: dim_csv.env, count(Int64(1)) AS count(*)
+02)--Aggregate: groupBy=[[dim_csv.env]], aggr=[[count(Int64(1))]]
+03)----TableScan: dim_csv projection=[env]
+physical_plan
+01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
+05)--------AggregateExec: mode=Partial, gby=[env@0 as env], 
aggr=[count(Int64(1))]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]},
 projection=[env], file_type=csv, has_header=true
+
+# Test 2: EXPLAIN query for Parquet table with GROUP BY
+# This plan differs from the one above and includes two consecutive 
repartitions — one round-robin and one hash —
+# which seems unnecessary. We may want to align it with the previous plan 
(push the round robin down or remove the round robin), or, if the input file is 
small,
+# avoid repartitioning altogether. A single partition should suffice for a 
single-step aggregate as the plan after this.
+
+query TT
+EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
+----
+logical_plan
+01)Projection: dim_parquet.env, count(Int64(1)) AS count(*)
+02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
+03)----TableScan: dim_parquet projection=[env]
+physical_plan
+01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], 
aggr=[count(Int64(1))]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[env@0 as env], 
aggr=[count(Int64(1))]
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
 projection=[env], file_type=parquet
+
+# Verify the queries actually work and return the same results
+query TI rowsort
+SELECT env, count(*) FROM dim_csv GROUP BY env;
+----
+dev 2
+prod 2
+test 1
+
+query TI rowsort
+SELECT env, count(*) FROM dim_parquet GROUP BY env;
+----
+dev 2
+prod 2
+test 1
+
+# Test 3: Change target partitions to 1 to have single-aggregate plan
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env;
+----
+logical_plan
+01)Projection: dim_parquet.env, count(Int64(1)) AS count(*)
+02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
+03)----TableScan: dim_parquet projection=[env]
+physical_plan
+01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
+02)--AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]},
 projection=[env], file_type=parquet


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to