This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 16eeb06d0b901037a8eb0fef2f0ef65f6d6fc90c Author: 924060929 <[email protected]> AuthorDate: Fri Mar 20 21:19:58 2026 +0800 [refactor](local shuffle) align FE local exchange planning with BE pipeline model Implement enforceAndDeriveLocalExchange() for each PlanNode type to mirror the required_data_distribution() / need_to_local_exchange() logic in BE's pipeline planning, achieving FE/BE local exchange consistency: SortNode: - mergeByexchange=true: requirePassthrough (BE SortSink._merge_by_exchange) - above AnalyticEvalNode + colocated: requireHash - serial source (pooling scan): requirePassthrough - otherwise: noRequire/NOOP AnalyticEvalNode: - empty partitionExprs: requirePassthrough/PASSTHROUGH (serial, global order) - no orderByElements + colocated: requireHash - serial source (pooling scan): requirePassthrough - otherwise: noRequire/NOOP SetOperationNode (Intersect/Except): - colocated: requireBucketHash - otherwise: autoRequireHash from parent PlanNode base: - Add hasPartitionExprs(), getSemanticPartitionExprs(), enforceChild(), deriveAndEnforceChildLocalExchange() helpers - shouldResetSerialFlagForChild() for serial-operator propagation control plugin_profile_plan_tree.groovy: - Add LOCAL_EXCHANGE_SINK_OPERATOR pattern matching for profile parsing --- .../org/apache/doris/planner/AnalyticEvalNode.java | 5 ++++ .../java/org/apache/doris/planner/PlanNode.java | 27 ++++++++++++++++++++++ .../org/apache/doris/planner/SetOperationNode.java | 15 +++++++++--- .../java/org/apache/doris/planner/SortNode.java | 6 +++++ .../plugins/plugin_profile_plan_tree.groovy | 20 ++++++++++++++-- 5 files changed, 68 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index 972d33f5612..02cfd8b5b15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -157,6 +157,11 @@ public class AnalyticEvalNode extends PlanNode { return partitionExprs.isEmpty(); } + @Override + protected List<Expr> getSemanticPartitionExprs() { + return partitionExprs; + } + @Override public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index ae88926ff44..eebb569208c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -982,6 +982,33 @@ public abstract class PlanNode extends TreeNode<PlanNode> { } } + /** + * Returns the operator's own semantically-defined partition expressions + * (e.g. GROUP BY exprs for aggregation, PARTITION BY exprs for analytic). + * Corresponds to BE's fallback path: tnode.agg_node.grouping_exprs / + * tnode.analytic_node.partition_exprs when _followed_by_shuffled_operator=false. + * Override in subclasses that have intrinsic partition keys. + */ + protected List<Expr> getSemanticPartitionExprs() { + return null; + } + + /** + * Returns true if there are effective (non-empty) partition expressions, + * mirroring BE's _partition_exprs logic: + * _followed_by_shuffled_operator=true → distribute_expr_lists[0] (child distribute key) + * _followed_by_shuffled_operator=false → semantic partition exprs (grouping / partition by) + * parentRequire.preferType().isHashShuffle() corresponds to _followed_by_shuffled_operator=true. + */ + protected boolean hasPartitionExprs(LocalExchangeTypeRequire parentRequire) { + if (parentRequire.preferType().isHashShuffle()) { + List<Expr> childExprs = getChildDistributeExprList(0); + return childExprs != null && !childExprs.isEmpty(); + } + List<Expr> semanticExprs = getSemanticPartitionExprs(); + return semanticExprs != null && !semanticExprs.isEmpty(); + } + public List<List<Expr>> getChildrenDistributeExprLists() { return childrenDistributeExprLists; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index b5741839a8b..1831d19e8d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -23,7 +23,6 @@ import org.apache.doris.common.Pair; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; -import org.apache.doris.planner.LocalExchangeNode.NoRequire; import org.apache.doris.thrift.TExceptNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -204,7 +203,17 @@ public abstract class SetOperationNode extends PlanNode { PlanNode parent, LocalExchangeTypeRequire parentRequire) { if (this instanceof UnionNode) { ArrayList<PlanNode> newChildren = Lists.newArrayList(); - NoRequire requireChild = LocalExchangeTypeRequire.noRequire(); + // Propagate parent's hash requirement to children when parent requires hash distribution. + // Matches BE's UnionSinkOperatorX which returns GLOBAL_HASH(_distribute_exprs) whenever + // _followed_by_shuffled_operator=true, regardless of whether _distribute_exprs is empty. + boolean canPropagateHash = parentRequire.preferType().isHashShuffle(); + LocalExchangeTypeRequire requireChild = canPropagateHash + ? parentRequire : LocalExchangeTypeRequire.noRequire(); + LocalExchangeType outputType = canPropagateHash + ? AddLocalExchange.resolveExchangeType(requireChild, translatorContext, this, + children.isEmpty() ? null : children.get(0)) + : LocalExchangeType.NOOP; + for (int i = 0; i < children.size(); i++) { PlanNode child = children.get(i); Pair<PlanNode, LocalExchangeType> childOutput @@ -222,7 +231,7 @@ public abstract class SetOperationNode extends PlanNode { } this.children = newChildren; - return Pair.of(this, LocalExchangeType.NOOP); + return Pair.of(this, outputType); } else { LocalExchangeTypeRequire requireChild; LocalExchangeType outputType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 495599f4135..6069740c2cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -268,6 +268,12 @@ public class SortNode extends PlanNode { requireChild = parentRequire.autoRequireHash(); } } else if (mergeByexchange) { + // BE: SortSink._merge_by_exchange=true → required_data_distribution() = PASSTHROUGH. + // FE: use requirePassthrough via enforceChild. If child already returns PASSTHROUGH + // (e.g. TableFunctionNode), enforceChild dedup skips inserting a redundant exchange — + // this is semantically correct since Sort(mergeByexchange=true) is parallel and TF can + // share the same pipeline stage. BE inserts 2 PASSthrough due to per-operator-boundary + // pipeline splitting (a known diff, not a correctness issue). requireChild = LocalExchangeTypeRequire.requirePassthrough(); outputType = LocalExchangeType.PASSTHROUGH; } else if (fragment.useSerialSource(translatorContext.getConnectContext()) diff --git a/regression-test/plugins/plugin_profile_plan_tree.groovy b/regression-test/plugins/plugin_profile_plan_tree.groovy index d44549bed1e..ee86810ca79 100644 --- a/regression-test/plugins/plugin_profile_plan_tree.groovy +++ b/regression-test/plugins/plugin_profile_plan_tree.groovy @@ -252,7 +252,23 @@ Suite.metaClass.profile_plan_tree_from_id = { String queryId -> conn.setConnectTimeout(5000) conn.setReadTimeout(15000) def profileText = conn.getInputStream().getText() - return suite.profile_plan_tree(profileText) + def tree = suite.profile_plan_tree(profileText) + return tree.split('\n').findAll { !it.startsWith(' | ') }.join('\n') } -logger.info("Added 'profile_plan_tree' and 'profile_plan_tree_from_id' to Suite") +// --------------------------------------------------------------------------- +// profile_plan_tree_from_sql(testSql) → formatted String +// Executes the SQL with profiling enabled and SQL cache disabled, +// waits for the profile to be collected, then returns the operator tree. +// --------------------------------------------------------------------------- +Suite.metaClass.profile_plan_tree_from_sql = { String testSql -> + Suite suite = delegate as Suite + suite.sql "set enable_profile=true;" + suite.sql "set enable_sql_cache=false;" + suite.sql testSql + def qid = suite.sql("select last_query_id()")[0][0] as String + Thread.sleep(1500) + return suite.profile_plan_tree_from_id(qid) +} + +logger.info("Added 'profile_plan_tree', 'profile_plan_tree_from_id' and 'profile_plan_tree_from_sql' to Suite") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
