This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 22b78ddeed1764d8362303bf8102ba526b4db3e9
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 27 18:54:54 2026 +0800

    [refactor](local shuffle) simplify FE local exchange enforcement helpers
    
    - PlanNode: extract enforceChildExchange() helper for join/set-operation 
nodes
      to enforce child exchange without serial-ancestor check or heavy-ops 
avoidance;
      add javadoc to shouldResetSerialFlagForChild()
    - HashJoinNode/NestedLoopJoinNode: use enforceChildExchange() instead of
      inline deriveAndEnforceChildLocalExchange() + manual exchange insertion
    - SetOperationNode: merge duplicated Union/Intersect+Except 
child-enforcement
      loops into a single shared loop using enforceChildExchange();
      also fixes set_intersect/set_except FE/BE consistency mismatches
    - AddLocalExchange: remove dead shouldUseLocalExecutionHash() parameters
      (always returns true, no context needed)
---
 .../org/apache/doris/planner/AddLocalExchange.java | 16 +-----
 .../org/apache/doris/planner/HashJoinNode.java     | 37 +++----------
 .../apache/doris/planner/NestedLoopJoinNode.java   | 32 ++----------
 .../java/org/apache/doris/planner/PlanNode.java    | 32 ++++++++++++
 .../org/apache/doris/planner/SetOperationNode.java | 61 +++++-----------------
 5 files changed, 57 insertions(+), 121 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index 4b9ebcb53f6..d982ab80385 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -103,21 +103,7 @@ public class AddLocalExchange {
 
     private static boolean shouldUseLocalExecutionHash(
             PlanTranslatorContext translatorContext, PlanNode parent, PlanNode 
child) {
-        PlanFragment fragment = null;
-        if (parent != null) {
-            fragment = parent.getFragment();
-        }
-        if (fragment == null && child != null) {
-            fragment = child.getFragment();
-        }
-
-        if (fragment != null && 
fragment.useSerialSource(translatorContext.getConnectContext())) {
-            return true;
-        }
-        if (child instanceof ScanNode) {
-            return true;
-        }
-        // For FE-planned intra-fragment hash exchanges, always prefer 
LOCAL_EXECUTION_HASH_SHUFFLE.
+        // Always prefer LOCAL_EXECUTION_HASH_SHUFFLE for FE-planned 
intra-fragment hash exchanges.
         // GLOBAL_EXECUTION_HASH_SHUFFLE requires shuffle_idx_to_instance_idx 
which may be empty
         // for fragments with non-hash sinks (UNPARTITIONED/MERGE). LOCAL_HASH 
is always safe
         // since it partitions by local instance count without needing 
external shuffle maps.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 11eab9f9e22..1b90770f539 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -331,38 +331,13 @@ public class HashJoinNode extends JoinNodeBase {
             outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
         }
 
-        PlanNode probeSide = children.get(0);
-        Pair<PlanNode, LocalExchangeType> probeSideOutput = 
deriveAndEnforceChildLocalExchange(
-                translatorContext, probeSide, probeSideRequire, 0);
-        if (!probeSideRequire.satisfy(probeSideOutput.second)) {
-            LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                    probeSideRequire, translatorContext, this, 
probeSideOutput.first);
-            probeSide = new LocalExchangeNode(
-                    translatorContext.nextPlanNodeId(), probeSideOutput.first, 
preferType,
-                    getChildDistributeExprList(0)
-            );
-        } else {
-            probeSide = probeSideOutput.first;
-        }
-
-        PlanNode buildSide = children.get(1);
-        Pair<PlanNode, LocalExchangeType> buildSideOutput = 
deriveAndEnforceChildLocalExchange(
-                translatorContext, buildSide, buildSideRequire, 1);
-        if (!buildSideRequire.satisfy(buildSideOutput.second)) {
-            LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                    buildSideRequire, translatorContext, this, 
buildSideOutput.first);
-            buildSide = new LocalExchangeNode(
-                    translatorContext.nextPlanNodeId(), buildSideOutput.first, 
preferType,
-                    getChildDistributeExprList(1)
-            );
-        } else {
-            buildSide = buildSideOutput.first;
-        }
-
-        this.children = Lists.newArrayList(probeSide, buildSide);
-
+        Pair<PlanNode, LocalExchangeType> probeResult = enforceChildExchange(
+                translatorContext, probeSideRequire, children.get(0), 0);
+        Pair<PlanNode, LocalExchangeType> buildResult = enforceChildExchange(
+                translatorContext, buildSideRequire, children.get(1), 1);
+        this.children = Lists.newArrayList(probeResult.first, 
buildResult.first);
         if (outputType == null) {
-            outputType = probeSideOutput.second;
+            outputType = probeResult.second;
         }
         return Pair.of(this, outputType);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index fd9ac16ccf5..a82e78ee093 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -207,34 +207,10 @@ public class NestedLoopJoinNode extends JoinNodeBase {
             outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH;
         }
 
-        PlanNode probeSide = children.get(0);
-        Pair<PlanNode, LocalExchangeType> probeSideOutput = 
deriveAndEnforceChildLocalExchange(
-                translatorContext, probeSide, probeSideRequire, 0);
-        if (!probeSideRequire.satisfy(probeSideOutput.second)) {
-            LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                    probeSideRequire, translatorContext, this, 
probeSideOutput.first);
-            probeSide = new LocalExchangeNode(
-                    translatorContext.nextPlanNodeId(), probeSideOutput.first, 
preferType,
-                    getChildDistributeExprList(0)
-            );
-        } else {
-            probeSide = probeSideOutput.first;
-        }
-
-        PlanNode buildSide = children.get(1);
-        Pair<PlanNode, LocalExchangeType> buildSideOutput = 
deriveAndEnforceChildLocalExchange(
-                translatorContext, buildSide, buildSideRequire, 1);
-        if (!buildSideRequire.satisfy(buildSideOutput.second)) {
-            LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                    buildSideRequire, translatorContext, this, 
buildSideOutput.first);
-            buildSide = new LocalExchangeNode(
-                    translatorContext.nextPlanNodeId(), buildSideOutput.first, 
preferType,
-                    getChildDistributeExprList(1)
-            );
-        } else {
-            buildSide = buildSideOutput.first;
-        }
-
+        PlanNode probeSide = enforceChildExchange(
+                translatorContext, probeSideRequire, children.get(0), 0).first;
+        PlanNode buildSide = enforceChildExchange(
+                translatorContext, buildSideRequire, children.get(1), 1).first;
         this.children = Lists.newArrayList(probeSide, buildSide);
         return Pair.of(this, outputType);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 2c3ab311e7d..a6059aef26b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1017,6 +1017,38 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> {
         return child.enforceAndDeriveLocalExchange(translatorContext, this, 
requireChild);
     }
 
+    /**
+     * Enforces a local exchange requirement on a single child without the 
serial-ancestor
+     * check or heavy-ops bottleneck avoidance that {@link #enforceChild} 
applies.
+     * Use for nodes whose children's distribution requirements must be 
satisfied regardless
+     * of serial ancestors in the same pipeline (joins, set operations, etc.).
+     *
+     * @return (resultNode, childOutputType) — resultNode may be a new 
LocalExchangeNode wrapper
+     *         if an exchange was inserted; childOutputType is the child's 
reported output
+     *         distribution before any inserted exchange (useful for deriving 
the parent's output).
+     */
+    protected Pair<PlanNode, LocalExchangeType> enforceChildExchange(
+            PlanTranslatorContext translatorContext, LocalExchangeTypeRequire 
require,
+            PlanNode child, int childIndex) {
+        Pair<PlanNode, LocalExchangeType> childOutput = 
deriveAndEnforceChildLocalExchange(
+                translatorContext, child, require, childIndex);
+        if (!require.satisfy(childOutput.second)) {
+            LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
+                    require, translatorContext, this, childOutput.first);
+            return Pair.of(
+                    new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
childOutput.first,
+                            preferType, 
getChildDistributeExprList(childIndex)),
+                    childOutput.second);
+        }
+        return childOutput;
+    }
+
+    /**
+     * Whether the child at {@code childIndex} starts a new pipeline context, 
causing
+     * its serial-ancestor flag to be reset to {@code false} rather than 
inherited from this node.
+     * Override to return {@code true} for pipeline-splitting nodes 
(LocalExchangeNode) and nodes
+     * whose children run in an independent pipeline segment (SortNode above 
analytic, etc.).
+     */
     protected boolean shouldResetSerialFlagForChild(int childIndex) {
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index 7343a74fd3f..bf1c2421e8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -206,69 +206,36 @@ public abstract class SetOperationNode extends PlanNode {
     @Override
     public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
             PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        LocalExchangeTypeRequire requireChild;
+        LocalExchangeType outputType;
+        PlanNode firstChild = children.isEmpty() ? null : children.get(0);
         if (this instanceof UnionNode) {
-            ArrayList<PlanNode> newChildren = Lists.newArrayList();
             // Propagate parent's hash requirement to children when parent 
requires hash distribution.
             // Matches BE's UnionSinkOperatorX which returns 
GLOBAL_HASH(_distribute_exprs) whenever
             // _followed_by_shuffled_operator=true, regardless of whether 
_distribute_exprs is empty.
             boolean canPropagateHash = 
parentRequire.preferType().isHashShuffle();
-            LocalExchangeTypeRequire requireChild = canPropagateHash
-                    ? parentRequire : LocalExchangeTypeRequire.noRequire();
-            LocalExchangeType outputType = canPropagateHash
-                    ? AddLocalExchange.resolveExchangeType(requireChild, 
translatorContext, this,
-                            children.isEmpty() ? null : children.get(0))
+            requireChild = canPropagateHash ? parentRequire : 
LocalExchangeTypeRequire.noRequire();
+            outputType = canPropagateHash
+                    ? AddLocalExchange.resolveExchangeType(requireChild, 
translatorContext, this, firstChild)
                     : LocalExchangeType.NOOP;
-
-            for (int i = 0; i < children.size(); i++) {
-                PlanNode child = children.get(i);
-                Pair<PlanNode, LocalExchangeType> childOutput
-                        = 
deriveAndEnforceChildLocalExchange(translatorContext, child, requireChild, i);
-                if (!requireChild.satisfy(childOutput.second)) {
-                    LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                            requireChild, translatorContext, this, 
childOutput.first);
-                    LocalExchangeNode localExchangeNode
-                            = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first,
-                                    preferType, getChildDistributeExprList(i));
-                    newChildren.add(localExchangeNode);
-                } else {
-                    newChildren.add(childOutput.first);
-                }
-            }
-
-            this.children = newChildren;
-            return Pair.of(this, outputType);
         } else {
-            LocalExchangeTypeRequire requireChild;
-            LocalExchangeType outputType;
+            // Intersect / Except
             if (AddLocalExchange.isColocated(this)) {
                 requireChild = LocalExchangeTypeRequire.requireBucketHash();
                 outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE;
             } else {
                 requireChild = parentRequire.autoRequireHash();
                 outputType = AddLocalExchange.resolveExchangeType(
-                        requireChild, translatorContext, this, 
children.isEmpty() ? null : children.get(0));
-            }
-
-            ArrayList<PlanNode> newChildren = Lists.newArrayList();
-            for (int i = 0; i < children.size(); i++) {
-                PlanNode child = children.get(i);
-                Pair<PlanNode, LocalExchangeType> childOutput
-                        = 
deriveAndEnforceChildLocalExchange(translatorContext, child, requireChild, i);
-                if (!requireChild.satisfy(childOutput.second)) {
-                    LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
-                            requireChild, translatorContext, this, 
childOutput.first);
-                    LocalExchangeNode localExchangeNode
-                            = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first,
-                                    preferType, getChildDistributeExprList(i));
-                    newChildren.add(localExchangeNode);
-                } else {
-                    newChildren.add(childOutput.first);
-                }
+                        requireChild, translatorContext, this, firstChild);
             }
+        }
 
-            this.children = newChildren;
-            return Pair.of(this, outputType);
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            newChildren.add(enforceChildExchange(translatorContext, 
requireChild, children.get(i), i).first);
         }
+        this.children = newChildren;
+        return Pair.of(this, outputType);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to