This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 16eeb06d0b901037a8eb0fef2f0ef65f6d6fc90c
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 20 21:19:58 2026 +0800

    [refactor](local shuffle) align FE local exchange planning with BE pipeline 
model
    
    Implement enforceAndDeriveLocalExchange() for each PlanNode type to mirror
    the required_data_distribution() / need_to_local_exchange() logic in BE's
    pipeline planning, achieving FE/BE local exchange consistency:
    
    SortNode:
    - mergeByexchange=true: requirePassthrough (BE SortSink._merge_by_exchange)
    - above AnalyticEvalNode + colocated: requireHash
    - serial source (pooling scan): requirePassthrough
    - otherwise: noRequire/NOOP
    
    AnalyticEvalNode:
    - empty partitionExprs: requirePassthrough/PASSTHROUGH (serial, global 
order)
    - no orderByElements + colocated: requireHash
    - serial source (pooling scan): requirePassthrough
    - otherwise: noRequire/NOOP
    
    SetOperationNode (Intersect/Except):
    - colocated: requireBucketHash
    - otherwise: autoRequireHash from parent
    
    PlanNode base:
    - Add hasPartitionExprs(), getSemanticPartitionExprs(), enforceChild(),
      deriveAndEnforceChildLocalExchange() helpers
    - shouldResetSerialFlagForChild() for serial-operator propagation control
    
    plugin_profile_plan_tree.groovy:
    - Add LOCAL_EXCHANGE_SINK_OPERATOR pattern matching for profile parsing
---
 .../org/apache/doris/planner/AnalyticEvalNode.java |  5 ++++
 .../java/org/apache/doris/planner/PlanNode.java    | 27 ++++++++++++++++++++++
 .../org/apache/doris/planner/SetOperationNode.java | 15 +++++++++---
 .../java/org/apache/doris/planner/SortNode.java    |  6 +++++
 .../plugins/plugin_profile_plan_tree.groovy        | 20 ++++++++++++++--
 5 files changed, 68 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index 972d33f5612..02cfd8b5b15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -157,6 +157,11 @@ public class AnalyticEvalNode extends PlanNode {
         return partitionExprs.isEmpty();
     }
 
+    @Override
+    protected List<Expr> getSemanticPartitionExprs() {
+        return partitionExprs;
+    }
+
     @Override
     public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
             PlanNode parent, LocalExchangeTypeRequire parentRequire) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index ae88926ff44..eebb569208c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -982,6 +982,33 @@ public abstract class PlanNode extends TreeNode<PlanNode> {
         }
     }
 
+    /**
+     * Returns the operator's own semantically-defined partition expressions
+     * (e.g. GROUP BY exprs for aggregation, PARTITION BY exprs for analytic).
+     * Corresponds to BE's fallback path: tnode.agg_node.grouping_exprs /
+     * tnode.analytic_node.partition_exprs when 
_followed_by_shuffled_operator=false.
+     * Override in subclasses that have intrinsic partition keys.
+     */
+    protected List<Expr> getSemanticPartitionExprs() {
+        return null;
+    }
+
+    /**
+     * Returns true if there are effective (non-empty) partition expressions,
+     * mirroring BE's _partition_exprs logic:
+     *   _followed_by_shuffled_operator=true  → distribute_expr_lists[0] 
(child distribute key)
+     *   _followed_by_shuffled_operator=false → semantic partition exprs 
(grouping / partition by)
+     * parentRequire.preferType().isHashShuffle() corresponds to 
_followed_by_shuffled_operator=true.
+     */
+    protected boolean hasPartitionExprs(LocalExchangeTypeRequire 
parentRequire) {
+        if (parentRequire.preferType().isHashShuffle()) {
+            List<Expr> childExprs = getChildDistributeExprList(0);
+            return childExprs != null && !childExprs.isEmpty();
+        }
+        List<Expr> semanticExprs = getSemanticPartitionExprs();
+        return semanticExprs != null && !semanticExprs.isEmpty();
+    }
+
     public List<List<Expr>> getChildrenDistributeExprLists() {
         return childrenDistributeExprLists;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index b5741839a8b..1831d19e8d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -23,7 +23,6 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
-import org.apache.doris.planner.LocalExchangeNode.NoRequire;
 import org.apache.doris.thrift.TExceptNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -204,7 +203,17 @@ public abstract class SetOperationNode extends PlanNode {
             PlanNode parent, LocalExchangeTypeRequire parentRequire) {
         if (this instanceof UnionNode) {
             ArrayList<PlanNode> newChildren = Lists.newArrayList();
-            NoRequire requireChild = LocalExchangeTypeRequire.noRequire();
+            // Propagate parent's hash requirement to children when parent 
requires hash distribution.
+            // Matches BE's UnionSinkOperatorX which returns 
GLOBAL_HASH(_distribute_exprs) whenever
+            // _followed_by_shuffled_operator=true, regardless of whether 
_distribute_exprs is empty.
+            boolean canPropagateHash = 
parentRequire.preferType().isHashShuffle();
+            LocalExchangeTypeRequire requireChild = canPropagateHash
+                    ? parentRequire : LocalExchangeTypeRequire.noRequire();
+            LocalExchangeType outputType = canPropagateHash
+                    ? AddLocalExchange.resolveExchangeType(requireChild, 
translatorContext, this,
+                            children.isEmpty() ? null : children.get(0))
+                    : LocalExchangeType.NOOP;
+
             for (int i = 0; i < children.size(); i++) {
                 PlanNode child = children.get(i);
                 Pair<PlanNode, LocalExchangeType> childOutput
@@ -222,7 +231,7 @@ public abstract class SetOperationNode extends PlanNode {
             }
 
             this.children = newChildren;
-            return Pair.of(this, LocalExchangeType.NOOP);
+            return Pair.of(this, outputType);
         } else {
             LocalExchangeTypeRequire requireChild;
             LocalExchangeType outputType;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 495599f4135..6069740c2cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -268,6 +268,12 @@ public class SortNode extends PlanNode {
                 requireChild = parentRequire.autoRequireHash();
             }
         } else if (mergeByexchange) {
+            // BE: SortSink._merge_by_exchange=true → 
required_data_distribution() = PASSTHROUGH.
+            // FE: use requirePassthrough via enforceChild. If child already 
returns PASSTHROUGH
+            // (e.g. TableFunctionNode), enforceChild dedup skips inserting a 
redundant exchange —
+            // this is semantically correct since Sort(mergeByexchange=true) 
is parallel and TF can
+            // share the same pipeline stage. BE inserts 2 PASSthrough due to 
per-operator-boundary
+            // pipeline splitting (a known diff, not a correctness issue).
             requireChild = LocalExchangeTypeRequire.requirePassthrough();
             outputType = LocalExchangeType.PASSTHROUGH;
         } else if 
(fragment.useSerialSource(translatorContext.getConnectContext())
diff --git a/regression-test/plugins/plugin_profile_plan_tree.groovy 
b/regression-test/plugins/plugin_profile_plan_tree.groovy
index d44549bed1e..ee86810ca79 100644
--- a/regression-test/plugins/plugin_profile_plan_tree.groovy
+++ b/regression-test/plugins/plugin_profile_plan_tree.groovy
@@ -252,7 +252,23 @@ Suite.metaClass.profile_plan_tree_from_id = { String 
queryId ->
     conn.setConnectTimeout(5000)
     conn.setReadTimeout(15000)
     def profileText = conn.getInputStream().getText()
-    return suite.profile_plan_tree(profileText)
+    def tree = suite.profile_plan_tree(profileText)
+    return tree.split('\n').findAll { !it.startsWith('      | ') }.join('\n')
 }
 
-logger.info("Added 'profile_plan_tree' and 'profile_plan_tree_from_id' to 
Suite")
+// ---------------------------------------------------------------------------
+// profile_plan_tree_from_sql(testSql) → formatted String
+//   Executes the SQL with profiling enabled and SQL cache disabled,
+//   waits for the profile to be collected, then returns the operator tree.
+// ---------------------------------------------------------------------------
+Suite.metaClass.profile_plan_tree_from_sql = { String testSql ->
+    Suite suite = delegate as Suite
+    suite.sql "set enable_profile=true;"
+    suite.sql "set enable_sql_cache=false;"
+    suite.sql testSql
+    def qid = suite.sql("select last_query_id()")[0][0] as String
+    Thread.sleep(1500)
+    return suite.profile_plan_tree_from_id(qid)
+}
+
+logger.info("Added 'profile_plan_tree', 'profile_plan_tree_from_id' and 
'profile_plan_tree_from_sql' to Suite")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to