This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
     new f9aa0f8f5ea [fix](local shuffle) fix num_senders for UNPARTITIONED 
sender fragments
f9aa0f8f5ea is described below

commit f9aa0f8f5ea40f51715894dae5e68bad14acc22e
Author: 924060929 <[email protected]>
AuthorDate: Mon Mar 30 12:33:50 2026 +0800

    [fix](local shuffle) fix num_senders for UNPARTITIONED sender fragments
    
    Fix deadlock when FE-planned local exchange inserts PASSTHROUGH after a
    serial Exchange in an UNPARTITIONED fragment.  The UNPARTITIONED fragment
    runs on only 1 worker, but 
ThriftPlansBuilder.senderFragmentOutputsSerially()
    did not recognize it as serial output (because useSerialSource() returned
    false for fragments without scan nodes).  This caused num_senders to be set
    to instanceJobs.size() (e.g., 3-4) instead of the actual number of distinct
    workers (1), making the receiver Exchange wait for senders that never exist.
    
    Fix: In senderFragmentOutputsSerially(), treat UNPARTITIONED fragments as
    always serial (they run on 1 worker and always output serially).
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../org/apache/doris/qe/runtime/ThriftPlansBuilder.java | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index ad1000c23da..aefaef2dc03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -297,10 +297,21 @@ public class ThriftPlansBuilder {
             PipelineDistributedPlan childPlan, ConnectContext connectContext) {
         PlanFragment fragment = childPlan.getFragmentJob().getFragment();
         PlanNode planRoot = fragment.getPlanRoot();
-        if (!fragment.useSerialSource(connectContext)) {
-            return false;
+        // A fragment outputs serially if its output pipeline has only 1 task.
+        // This happens when:
+        // 1. The fragment uses serial source (pooling scan) AND the plan root
+        //    is serial or has no serial children (no local exchange fan-out)
+        // 2. The fragment's data partition is UNPARTITIONED — it runs on only
+        //    1 worker and always outputs serially regardless of local exchange
+        if (fragment.getDataPartition().isPartitioned()) {
+            if (!fragment.useSerialSource(connectContext)) {
+                return false;
+            }
+            return planRoot.isSerialOperator() || 
!planRoot.hasSerialChildren();
+        } else {
+            // UNPARTITIONED fragment: only 1 worker, outputs serially
+            return true;
         }
-        return planRoot.isSerialOperator() || !planRoot.hasSerialChildren();
     }
 
     private static void 
setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to