This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d084005a7f1ed3168c1b264d4a69dfbf6b83d040
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 20 21:13:45 2026 +0800

    [refactor](local shuffle) align AggregationNode with BE 
three-operator-class model
    
    Reorganize AggregationNode.enforceAndDeriveLocalExchange() to mirror BE's
    three agg operator classes, making the FE logic easier to reason about and
    maintain alongside BE changes.
    
    ## FE change (AggregationNode.java)
    
    Previously the method had an ad-hoc branching order that mixed concerns from
    different BE operator classes. The new structure maps 1-to-1 to BE:
    
    1. DistinctStreamingAggOperatorX  — canUseDistinctStreamingAgg() branch
    2. StreamingAggOperatorX          — useStreamingPreagg branch
       - Child is HashJoinNode + enableStreamingAggHashJoinForcePassthrough → 
PASSTHROUGH
       - fragment.useSerialSource() (pooling/serial scan) → PASSTHROUGH (avoids 
deadlock)
       - Otherwise → NOOP
    3. AggSinkOperatorX               — else branch
       - No group key, finalize  → NOOP
       - No group key, serialize + serial source → PASSTHROUGH 
(deadlock-prevention fix)
       - Colocate → requireHash (BUCKET_HASH_SHUFFLE)
       - Non-colocate, has group key → shuffledAggNodeIds / parentRequire / 
noRequire
    
    The useStreamingPreagg branch is now a top-level sibling of 
canUseDistinctStreamingAgg,
    matching BE's operator dispatch order, instead of being buried inside the 
else branch.
    
    ## BE change (pipeline_fragment_context.cpp)
    
    In _create_deferred_local_exchangers(), ensure the upstream pipeline's task 
count
    is raised to _num_instances before the sender_count snapshot is taken. For 
pooling
    scan (parallel_instances=1), the upstream pipe was initialized with 1 task; 
the
    deferred exchanger must run with the full fragment parallelism so that all
    downstream pipelines receive data from every instance.
    
    ## Test coverage
    
    Extend test_local_shuffle_fe_be_consistency.groovy with 6 new cases:
    - agg_1phase_bucket_key_serial_source: colocate agg under serial source 
(svSerialSource)
    - agg_finalize_serial_pooling_bucket: pooling scan + bucket-key agg 
(knownDiff, pipeline granularity diff)
    - agg_finalize_serial_pooling_non_bucket: pooling scan + non-bucket agg 
(MATCH after deadlock fix)
    - agg_finalize_force_local_shuffle_{non_bucket,bucket_key}: 
force_to_local_shuffle scenarios
    - agg_after_nlj_{non_bucket,bucket_key}: NLJ probe → agg; bucket-key case 
generates
      LOCAL_HASH_SHUFFLE (PASSTHROUGH source after ADAPTIVE_PASSTHROUGH split → 
HASH required)
    
    All 40 cases pass with 0 real mismatches (1 expected knownDiff: 
table_function).
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  8 ++
 .../org/apache/doris/planner/AggregationNode.java  | 63 ++++++++-------
 .../test_local_shuffle_fe_be_consistency.groovy    | 91 ++++++++++++++++++++++
 3 files changed, 134 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 20ebe242e5d..d6f1a6f11d2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -666,6 +666,14 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
+        // Mirror _inherit_pipeline_properties() from the native 
_add_local_exchange_impl path:
+        // the upstream (SINK) pipeline should run with _num_instances tasks, 
not the
+        // serial-scan parallelism (_parallel_instances=1 for pooling scan).  
At this point
+        // all operators have been added, so this is the final task-count 
assignment and
+        // cannot be overridden by a later add_operator call.
+        if (info.upstream_pipe->num_tasks() < _num_instances) {
+            info.upstream_pipe->set_num_tasks(_num_instances);
+        }
         const int sender_count = info.upstream_pipe->num_tasks();
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 5d25cbef9f7..1f59897f8c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -276,9 +276,8 @@ public class AggregationNode extends PlanNode {
         SessionVariable sessionVariable = connectContext.getSessionVariable();
 
         LocalExchangeTypeRequire requireChild;
-        if (needsFinalize && aggInfo.getGroupingExprs().isEmpty()) {
-            requireChild = LocalExchangeTypeRequire.noRequire();
-        } else if (canUseDistinctStreamingAgg(sessionVariable)) {
+        if (canUseDistinctStreamingAgg(sessionVariable)) {
+            // DistinctStreamingAggOperatorX in BE
             if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 && 
!useStreamingPreagg)) {
                 if (AddLocalExchange.isColocated(this)) {
                     requireChild = LocalExchangeTypeRequire.requireHash();
@@ -286,42 +285,50 @@ public class AggregationNode extends PlanNode {
                     requireChild = parentRequire.autoRequireHash();
                 }
             } else if 
(sessionVariable.enableDistinctStreamingAggForcePassthrough) {
-                // BE's DistinctStreamingAggOperatorX always returns 
PASSTHROUGH for phase1
-                // (non-finalize, streaming preagg) when 
enable_distinct_streaming_agg_force_passthrough=true (default).
-                // The childUsePoolingScan case above is a subset of this.
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else {
+                requireChild = LocalExchangeTypeRequire.noRequire();
+            }
+        } else if (useStreamingPreagg) {
+            // StreamingAggOperatorX in BE: base class 
required_data_distribution():
+            //   _child->is_serial_operator() ? PASSTHROUGH : NOOP
+            // Special: directly above HashJoin probe with force-passthrough → 
PASSTHROUGH.
+            if (children.get(0) instanceof HashJoinNode
+                    && 
sessionVariable.enableStreamingAggHashJoinForcePassthrough) {
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else if (fragment != null && 
fragment.useSerialSource(connectContext)) {
                 requireChild = LocalExchangeTypeRequire.requirePassthrough();
             } else {
                 requireChild = LocalExchangeTypeRequire.noRequire();
             }
         } else {
+            // AggSinkOperatorX in BE: required_data_distribution() override
             if (aggInfo.getGroupingExprs().isEmpty()) {
-                // Streaming pre-agg with no group key: each task aggregates 
its own partition
-                // independently; the finalize stage (serial) will collect the 
partial results.
-                // No redistribution needed — matches BE's AggSinkOperatorX 
which returns NOOP
-                // for !_needs_finalize && _partition_exprs.empty().
-                requireChild = LocalExchangeTypeRequire.noRequire();
-            } else if (!needsFinalize) {
-                // First-phase (serialize) agg: matches BE's 
StreamingAggregationOperatorX.
-                // Case 1: streaming pre-agg directly above a HashJoin probe 
and
-                // enable_streaming_agg_hash_join_force_passthrough=true 
(default) → BE returns
-                // PASSTHROUGH to split the pipeline at this boundary. 
Replicate this in FE.
-                // Case 2: shuffled_agg_node_ids includes this node → BE 
returns
-                // GLOBAL_EXECUTION_HASH_SHUFFLE(grouping_exprs). Replicate in 
FE.
-                // Default: NOOP — each task pre-aggregates its own partition 
independently.
-                if (useStreamingPreagg && children.get(0) instanceof 
HashJoinNode
-                        && 
sessionVariable.enableStreamingAggHashJoinForcePassthrough) {
-                    requireChild = 
LocalExchangeTypeRequire.requirePassthrough();
-                } else if 
(sessionVariable.getShuffledAggNodeIds().contains(this.getId().asInt())) {
-                    requireChild = LocalExchangeTypeRequire.requireHash();
-                } else {
+                if (needsFinalize) {
+                    // Finalize agg, no group key: NOOP (single serial 
instance collects all)
                     requireChild = LocalExchangeTypeRequire.noRequire();
+                } else {
+                    // Serialize agg, no group key: base class → child serial 
→ PASSTHROUGH, else NOOP
+                    if (fragment != null && 
fragment.useSerialSource(connectContext)) {
+                        requireChild = 
LocalExchangeTypeRequire.requirePassthrough();
+                    } else {
+                        requireChild = LocalExchangeTypeRequire.noRequire();
+                    }
                 }
             } else if (AddLocalExchange.isColocated(this)) {
+                // Colocate: BUCKET_HASH_SHUFFLE
                 requireChild = LocalExchangeTypeRequire.requireHash();
-            } else if (hasPartitionExprs(parentRequire)) {
-                requireChild = parentRequire.autoRequireHash();
             } else {
-                requireChild = LocalExchangeTypeRequire.noRequire();
+                // Non-colocate, has group key: BE → GLOBAL_HASH
+                // FE uses parentRequire as proxy (no full 
need_to_local_exchange equivalent)
+                if (!needsFinalize
+                        && 
sessionVariable.getShuffledAggNodeIds().contains(this.getId().asInt())) {
+                    requireChild = LocalExchangeTypeRequire.requireHash();
+                } else if (hasPartitionExprs(parentRequire)) {
+                    requireChild = parentRequire.autoRequireHash();
+                } else {
+                    requireChild = LocalExchangeTypeRequire.noRequire();
+                }
             }
         }
 
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
index 605a861213c..25b19cc487f 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
@@ -184,10 +184,12 @@ suite("test_local_shuffle_fe_be_consistency", 
"nereids_p0") {
     //  ls_t1: HASH(k1) 8 buckets
     //  ls_t2: HASH(k1) 8 buckets  (same distribution → colocate-eligible)
     //  ls_t3: HASH(k4) 5 buckets  (different distribution)
+    //  ls_serial: HASH(k1) 2 buckets (for serial-scan tests: 2 < 
parallel_pipeline_task_num=4)
     // ============================================================
     sql "DROP TABLE IF EXISTS ls_t1"
     sql "DROP TABLE IF EXISTS ls_t2"
     sql "DROP TABLE IF EXISTS ls_t3"
+    sql "DROP TABLE IF EXISTS ls_serial"
 
     sql """
         CREATE TABLE ls_t1 (
@@ -241,6 +243,21 @@ suite("test_local_shuffle_fe_be_consistency", 
"nereids_p0") {
             (3, 1003, 8), (4, 1004, 9), (5, 1005, 10)
     """
 
+    sql """
+        CREATE TABLE ls_serial (
+            k1 INT NOT NULL,
+            k2 INT,
+            v1 INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(k1, k2)
+        DISTRIBUTED BY HASH(k1) BUCKETS 2
+        PROPERTIES ("replication_num" = "1")
+    """
+    sql """
+        INSERT INTO ls_serial VALUES
+            (1, 10, 2), (2, 20, 4), (3, 30, 5), (4, 40, 6)
+    """
+
     // SET_VAR prefix used in most test SQLs (disables plan reorder/colocate 
for deterministic plans)
     def sv = 
"/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,ignore_storage_data_distribution=false,parallel_pipeline_task_num=4,auto_broadcast_join_threshold=-1,broadcast_row_count_limit=0)*/"
     // Same as sv but forces serial source path (default in many environments)
@@ -270,6 +287,20 @@ suite("test_local_shuffle_fe_be_consistency", 
"nereids_p0") {
     checkConsistencyWithSql("agg_1phase_bucket_key_serial_source",
         "SELECT ${svSerialSource} k1, count(*) AS cnt FROM ls_t1 GROUP BY k1 
ORDER BY k1")
 
+    // 1-2c: Finalize agg, serial/pooling scan, bucket key (k1), ls_serial (2 
buckets).
+    //       Known diff: For pooling scan + bucket-key colocate agg, BE 
inserts LOCAL_HASH_SHUFFLE
+    //       running as 4 pipeline tasks + 2 PASSTHROUGH exchanges (one per 
pipeline boundary).
+    //       FE inserts LOCAL_HASH_SHUFFLE as a single tree node (1 task) + 1 
PASSTHROUGH.
+    //       BE's pipeline-level task granularity produces more profile 
entries than FE's tree model.
+    //       Results are correct (verified by check_sql_equal).
+    checkConsistencyWithSql("agg_finalize_serial_pooling_bucket",
+        "SELECT ${svSerialSource} k1, count(*) AS cnt FROM ls_serial GROUP BY 
k1 ORDER BY k1",
+        true /* knownDiff */)
+
+    // 1-2d: Agg, serial/pooling scan, non-bucket key (k2), ls_serial.
+    checkConsistencyWithSql("agg_finalize_serial_pooling_non_bucket",
+        "SELECT ${svSerialSource} k2, count(*) AS cnt FROM ls_serial GROUP BY 
k2 ORDER BY k2")
+
     // 1-3: AggSink 1-phase, non-bucket key (k2)
     //      BE: GLOBAL_EXECUTION_HASH_SHUFFLE vs BUCKET_HASH_SHUFFLE from scan
     //      → inserts GLOBAL_HASH_SHUFFLE (or LOCAL_HASH_SHUFFLE for local 
execution)
@@ -637,6 +668,66 @@ suite("test_local_shuffle_fe_be_consistency", 
"nereids_p0") {
            FROM ls_t1 a JOIN [shuffle] ls_t2 b ON a.k1 = b.k1
            ORDER BY a.k1""")
 
+    // ================================================================
+    // Section 11: AggSink LOCAL_HASH_SHUFFLE scenarios
+    // Scenarios where BE's need_to_local_exchange() inserts GLOBAL/BUCKET
+    // hash exchange because the source distribution is not hash-compatible.
+    //
+    // Key rule in pipeline.cpp need_to_local_exchange():
+    //   If source is BUCKET_HASH and sink requires GLOBAL_HASH → both are hash
+    //   → need_to_local_exchange returns false → NO local exchange.
+    //   But if source is PASSTHROUGH/NOOP → not both hash → insert 
GLOBAL_HASH.
+    // ================================================================
+
+    // 11-1: force_to_local_shuffle=true + non-bucket finalize agg
+    //       With force_to_local_shuffle, OlapScanNode.isSerialOperator()=true 
even with 8 tablets.
+    //       Optimizer puts agg in a separate finalize fragment receiving 
hash-partitioned data,
+    //       so no LOCAL_HASH_SHUFFLE is generated — only PASSTHROUGH for the 
NLJ/scan boundary.
+    checkConsistencyWithSql("agg_finalize_force_local_shuffle_non_bucket",
+        """SELECT 
/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,
+                             
ignore_storage_data_distribution=false,parallel_pipeline_task_num=4,
+                             
force_to_local_shuffle=true,enable_local_shuffle=true)*/ k2, count(*) AS cnt
+           FROM ls_t1 GROUP BY k2 ORDER BY k2""")
+
+    // 11-2: force_to_local_shuffle=true + bucket-key finalize agg
+    //       GROUP BY k1 (bucket key of ls_t1): colocate agg stays in same 
fragment as scan.
+    //       FE: AggNode is colocate → requireHash. BE: AggSink returns 
BUCKET_HASH.
+    //       Result MATCH: [PASSTHROUGH:9], consistent with other bucket-key 
colocate cases.
+    checkConsistencyWithSql("agg_finalize_force_local_shuffle_bucket_key",
+        """SELECT 
/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,
+                             
ignore_storage_data_distribution=false,parallel_pipeline_task_num=4,
+                             
force_to_local_shuffle=true,enable_local_shuffle=true)*/ k1, count(*) AS cnt
+           FROM ls_t1 GROUP BY k1 ORDER BY k1""")
+
+    // 11-3: NLJ (theta join) → finalize agg on non-bucket key
+    //       GROUP BY k2 (non-bucket): optimizer puts agg in a separate 
finalize fragment
+    //       receiving data via a hash-partitioned inter-fragment exchange on 
k2.
+    //       Within each fragment, the distributions are compatible → no 
LOCAL_HASH_SHUFFLE.
+    //       Result MATCH: [ADAPTIVE_PASSTHROUGH:5, PASSTHROUGH:5]
+    checkConsistencyWithSql("agg_after_nlj_non_bucket",
+        """SELECT 
/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,
+                             
ignore_storage_data_distribution=false,parallel_pipeline_task_num=4,
+                             
auto_broadcast_join_threshold=-1,broadcast_row_count_limit=0)*/ a.k2, count(*) 
AS cnt
+           FROM ls_t1 a, ls_t2 b WHERE a.k1 > b.k1
+           GROUP BY a.k2 ORDER BY a.k2""")
+
+    // 11-4: NLJ (theta join) → finalize agg on bucket key (LOCAL_HASH_SHUFFLE 
test)
+    //       GROUP BY k1 (bucket key): colocate agg stays in same pipeline as 
NLJ probe.
+    //       The NLJ probe requires ADAPTIVE_PASSTHROUGH → local exchange 
inserted.
+    //       After that exchange, the next pipeline has LocalExchangeSource 
(PASSTHROUGH distribution)
+    //       feeding into AggSink (BUCKET_HASH_SHUFFLE for colocate k1 agg).
+    //       PASSTHROUGH source ≠ BUCKET_HASH target, not both-hash → 
need_to_local_exchange=true
+    //       → BE inserts LOCAL_HASH_SHUFFLE (BUCKET type).
+    //       FE: AggNode isColocated=true → requireHash → inserts 
LocalExchangeNode.
+    //       Result MATCH: [ADAPTIVE_PASSTHROUGH:9, LOCAL_HASH_SHUFFLE:9, 
PASSTHROUGH:9]
+    //       This is a primary test for regular AggSink generating 
LOCAL_HASH_SHUFFLE.
+    checkConsistencyWithSql("agg_after_nlj_bucket_key",
+        """SELECT 
/*+SET_VAR(disable_join_reorder=true,disable_colocate_plan=true,
+                             
ignore_storage_data_distribution=false,parallel_pipeline_task_num=4,
+                             
auto_broadcast_join_threshold=-1,broadcast_row_count_limit=0)*/ a.k1, count(*) 
AS cnt
+           FROM ls_t1 a, ls_t2 b WHERE a.k1 > b.k1
+           GROUP BY a.k1 ORDER BY a.k1""")
+
     // ================================================================
     //  Summary
     // ================================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to