This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 25f5d5fb1e9 [fix](nereids) fix colocate agg + join compute wrong 
result (#48934) (#51313)
25f5d5fb1e9 is described below

commit 25f5d5fb1e9f571b05aca4ff130932dd911119cb
Author: 924060929 <[email protected]>
AuthorDate: Wed May 28 15:30:58 2025 +0800

    [fix](nereids) fix colocate agg + join compute wrong result (#48934) 
(#51313)
    
    cherry pick from #48934
---
 .../glue/translator/PhysicalPlanTranslator.java    | 15 ++++
 .../distribute/colocate_agg_join.groovy            | 84 ++++++++++++++++++++++
 2 files changed, 99 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a95ca993888..239344dc856 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1066,6 +1066,21 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             // Set colocate info in agg node. This is a hint for local 
shuffling to decide which type of
             // local exchanger will be used.
             aggregationNode.setColocate(true);
+
+            Plan child = aggregate.child();
+            // we should set colocate = true, when the same LogicalAggregate 
generate two PhysicalHashAggregates
+            // in one fragment:
+            //
+            // agg(merge finalize)   <- current, set colocate = true
+            //          |
+            // agg(update serialize) <- child, also set colocate = true
+            if (aggregate.getAggregateParam().aggMode.consumeAggregateBuffer
+                    && child instanceof PhysicalHashAggregate
+                    && !((PhysicalHashAggregate<Plan>) 
child).getAggregateParam().aggMode.consumeAggregateBuffer
+                    && inputPlanFragment.getPlanRoot() instanceof 
AggregationNode) {
+                AggregationNode childAgg = (AggregationNode) 
inputPlanFragment.getPlanRoot();
+                childAgg.setColocate(true);
+            }
         }
         setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
         if (aggregate.getStats() != null) {
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy 
b/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy
new file mode 100644
index 00000000000..d7d88b89cd8
--- /dev/null
+++ 
b/regression-test/suites/nereids_syntax_p0/distribute/colocate_agg_join.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("colocate_agg_join") {
+    multi_sql """
+        drop table if exists colocate_agg_join1;
+        drop table if exists colocate_agg_join2;
+
+        create table colocate_agg_join1 (
+          col_int_undef_signed_not_null int  not null ,
+          col_date_undef_signed_not_null date  not null ,
+          pk int,
+          col_int_undef_signed int  null ,
+          col_date_undef_signed date  null ,
+          col_varchar_10__undef_signed varchar(10)  null ,
+          col_varchar_10__undef_signed_not_null varchar(10)  not null ,
+          col_varchar_1024__undef_signed varchar(1024)  null ,
+          col_varchar_1024__undef_signed_not_null varchar(1024)  not null
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null, pk)
+        PARTITION BY RANGE(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), 
('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', 
'2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', 
'2023-12-21')))
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+        insert into 
colocate_agg_join1(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
 values 
(0,null,6,'2023-12-18','2023-12-20','b','g','were','as'),(1,4,0,'2023-12-16','2023-12-16','have','l','be','do'),(2,null,3,'2023-12-14','2023-12-14','there','why','were','s'),(3,null,2,'2023-12-20','20
 [...]
+
+        create table colocate_agg_join2 (
+          pk int,
+          col_int_undef_signed int  null ,
+          col_int_undef_signed_not_null int  not null ,
+          col_date_undef_signed date  null ,
+          col_date_undef_signed_not_null date  not null ,
+          col_varchar_10__undef_signed varchar(10)  null ,
+          col_varchar_10__undef_signed_not_null varchar(10)  not null ,
+          col_varchar_1024__undef_signed varchar(1024)  null ,
+          col_varchar_1024__undef_signed_not_null varchar(1024)  not null
+        ) engine=olap
+        DUPLICATE KEY(pk)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+        insert into 
colocate_agg_join2(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
 values 
(0,4,0,'2023-12-18','2023-12-11','to','see','he''s','r'),(1,4,5,'2023-12-10','2023-12-09','him','s','didn''t','k');
+
+        set disable_join_reorder=true;
+        set enable_local_shuffle=true;
+        set runtime_filter_mode=off;
+    
+        set 
disable_nereids_rules='ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT,TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT,TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE';
+    """
+
+    for (def i in 1..10) {
+        sql "set parallel_pipeline_task_num=${i}"
+        test {
+            sql """
+                select avg_pk
+                from (
+                   select t1.*
+                   from colocate_agg_join1 AS alias1
+                   right anti join (
+                     select pk, avg(distinct pk) avg_pk
+                     from colocate_agg_join2
+                     group by pk
+                   ) t1
+                   ON t1.`pk` = alias1.`pk`
+                )a
+                order by pk
+                limit 1000
+                """
+            rowNum 0
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to