This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 25f5d5fb1e9 [fix](nereids) fix colocate agg + join compute wrong
result (#48934) (#51313)
25f5d5fb1e9 is described below
commit 25f5d5fb1e9f571b05aca4ff130932dd911119cb
Author: 924060929 <[email protected]>
AuthorDate: Wed May 28 15:30:58 2025 +0800
[fix](nereids) fix colocate agg + join compute wrong result (#48934)
(#51313)
cherry pick from #48934
---
.../glue/translator/PhysicalPlanTranslator.java | 15 ++++
.../distribute/colocate_agg_join.groovy | 84 ++++++++++++++++++++++
2 files changed, 99 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a95ca993888..239344dc856 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1066,6 +1066,21 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// Set colocate info in agg node. This is a hint for local
shuffling to decide which type of
// local exchanger will be used.
aggregationNode.setColocate(true);
+
+ Plan child = aggregate.child();
+ // we should set colocate = true, when the same LogicalAggregate
generate two PhysicalHashAggregates
+ // in one fragment:
+ //
+ // agg(merge finalize) <- current, set colocate = true
+ // |
+ // agg(update serialize) <- child, also set colocate = true
+ if (aggregate.getAggregateParam().aggMode.consumeAggregateBuffer
+ && child instanceof PhysicalHashAggregate
+ && !((PhysicalHashAggregate<Plan>)
child).getAggregateParam().aggMode.consumeAggregateBuffer
+ && inputPlanFragment.getPlanRoot() instanceof
AggregationNode) {
+ AggregationNode childAgg = (AggregationNode)
inputPlanFragment.getPlanRoot();
+ childAgg.setColocate(true);
+ }
}
setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy
new file mode 100644
index 00000000000..d7d88b89cd8
--- /dev/null
+++
b/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("colocate_agg_join") {
+ multi_sql """
+ drop table if exists colocate_agg_join1;
+ drop table if exists colocate_agg_join2;
+
+ create table colocate_agg_join1 (
+ col_int_undef_signed_not_null int not null ,
+ col_date_undef_signed_not_null date not null ,
+ pk int,
+ col_int_undef_signed int null ,
+ col_date_undef_signed date null ,
+ col_varchar_10__undef_signed varchar(10) null ,
+ col_varchar_10__undef_signed_not_null varchar(10) not null ,
+ col_varchar_1024__undef_signed varchar(1024) null ,
+ col_varchar_1024__undef_signed_not_null varchar(1024) not null
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed_not_null,
col_date_undef_signed_not_null, pk)
+ PARTITION BY RANGE(col_int_undef_signed_not_null,
col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'),
('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6',
'2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000',
'2023-12-21')))
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ insert into
colocate_agg_join1(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
values
(0,null,6,'2023-12-18','2023-12-20','b','g','were','as'),(1,4,0,'2023-12-16','2023-12-16','have','l','be','do'),(2,null,3,'2023-12-14','2023-12-14','there','why','were','s'),(3,null,2,'2023-12-20','20
[...]
+
+ create table colocate_agg_join2 (
+ pk int,
+ col_int_undef_signed int null ,
+ col_int_undef_signed_not_null int not null ,
+ col_date_undef_signed date null ,
+ col_date_undef_signed_not_null date not null ,
+ col_varchar_10__undef_signed varchar(10) null ,
+ col_varchar_10__undef_signed_not_null varchar(10) not null ,
+ col_varchar_1024__undef_signed varchar(1024) null ,
+ col_varchar_1024__undef_signed_not_null varchar(1024) not null
+ ) engine=olap
+ DUPLICATE KEY(pk)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ insert into
colocate_agg_join2(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
values
(0,4,0,'2023-12-18','2023-12-11','to','see','he''s','r'),(1,4,5,'2023-12-10','2023-12-09','him','s','didn''t','k');
+
+ set disable_join_reorder=true;
+ set enable_local_shuffle=true;
+ set runtime_filter_mode=off;
+
+ set
disable_nereids_rules='ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT,TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT,TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE';
+ """
+
+ for (def i in 1..10) {
+ sql "set parallel_pipeline_task_num=${i}"
+ test {
+ sql """
+ select avg_pk
+ from (
+ select t1.*
+ from colocate_agg_join1 AS alias1
+ right anti join (
+ select pk, avg(distinct pk) avg_pk
+ from colocate_agg_join2
+ group by pk
+ ) t1
+ ON t1.`pk` = alias1.`pk`
+ )a
+ order by pk
+ limit 1000
+ """
+ rowNum 0
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]