This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 5a83226b105 branch-4.1: [fix](fe) Send recursive CTE blocks to every 
scan instance #64964 (#65008)
5a83226b105 is described below

commit 5a83226b1050705532a0bbde0c05bd0e02b5885b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 1 17:24:20 2026 +0800

    branch-4.1: [fix](fe) Send recursive CTE blocks to every scan instance 
#64964 (#65008)
    
    Cherry-picked from #64964
    
    ---------
    
    Co-authored-by: Pxl <[email protected]>
---
 .../doris/qe/runtime/ThriftPlansBuilder.java       | 58 ++++++++++----
 .../doris/qe/runtime/ThriftPlansBuilderTest.java   | 91 ++++++++++++++++++++++
 .../recursive_cte/rec_cte_parallel_targets.groovy  | 78 +++++++++++++++++++
 3 files changed, 213 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 2ef71ab66dc..eba3343bd03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -70,7 +70,6 @@ import org.apache.doris.thrift.TRuntimeFilterInfo;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TTopnFilterDesc;
-import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ArrayListMultimap;
@@ -698,15 +697,12 @@ public class ThriftPlansBuilder {
             // collect all assigned instance network addresses for this 
fragment
             List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs();
             Set<TNetworkAddress> networkAddresses = new TreeSet<>();
-            Map<TNetworkAddress, TUniqueId> addressTUniqueIdMap = new 
TreeMap<>();
             for (AssignedJob assignedJob : fragmentAssignedJobs) {
                 DistributedPlanWorker distributedPlanWorker = 
assignedJob.getAssignedWorker();
                 // use brpc port + host as the address used by BE for 
control/reset
                 TNetworkAddress networkAddress = new 
TNetworkAddress(distributedPlanWorker.host(),
                         distributedPlanWorker.brpcPort());
-                if (networkAddresses.add(networkAddress)) {
-                    addressTUniqueIdMap.put(networkAddress, 
assignedJob.instanceId());
-                }
+                networkAddresses.add(networkAddress);
             }
             PlanFragment planFragment = plan.getFragmentJob().getFragment();
             // remember addresses for later when building reset infos
@@ -727,15 +723,8 @@ public class ThriftPlansBuilder {
                     throw new IllegalStateException(
                             "fragmentAssignedJobs is empty for recursive cte 
scan node");
                 }
-                // Build a TRecCTETargets
-                List<TRecCTETarget> recCTETargets = new 
ArrayList<>(addressTUniqueIdMap.size());
-                for (Entry<TNetworkAddress, TUniqueId> entry : 
addressTUniqueIdMap.entrySet()) {
-                    TRecCTETarget tRecCTETarget = new TRecCTETarget();
-                    tRecCTETarget.setAddr(entry.getKey());
-                    tRecCTETarget.setFragmentInstanceId(entry.getValue());
-                    
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
-                    recCTETargets.add(tRecCTETarget);
-                }
+                List<TRecCTETarget> recCTETargets = buildRecCTETargets(
+                        fragmentAssignedJobs, recursiveCteScanNodes.get(0));
                 // store the target for producers to reference later
                 fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(), 
recCTETargets);
             }
@@ -819,6 +808,47 @@ public class ThriftPlansBuilder {
         return fragmentToNotifyClose;
     }
 
+    static List<TRecCTETarget> buildRecCTETargets(List<AssignedJob> 
fragmentAssignedJobs,
+            RecursiveCteScanNode recursiveCteScanNode) {
+        List<AssignedJob> targetJobs = 
selectRecCTETargetJobs(fragmentAssignedJobs);
+        List<TRecCTETarget> targets = new ArrayList<>(targetJobs.size());
+        for (AssignedJob assignedJob : targetJobs) {
+            DistributedPlanWorker worker = assignedJob.getAssignedWorker();
+            TRecCTETarget target = new TRecCTETarget();
+            target.setAddr(new TNetworkAddress(worker.host(), 
worker.brpcPort()));
+            target.setFragmentInstanceId(assignedJob.instanceId());
+            target.setNodeId(recursiveCteScanNode.getId().asInt());
+            targets.add(target);
+        }
+        return targets;
+    }
+
+    static List<AssignedJob> selectRecCTETargetJobs(List<AssignedJob> 
fragmentAssignedJobs) {
+        List<AssignedJob> targetJobs = new 
ArrayList<>(fragmentAssignedJobs.size());
+        Map<TNetworkAddress, Integer> localShuffleTargetIndexMap = 
Maps.newLinkedHashMap();
+        for (AssignedJob assignedJob : fragmentAssignedJobs) {
+            if (!(assignedJob instanceof LocalShuffleAssignedJob)) {
+                targetJobs.add(assignedJob);
+                continue;
+            }
+
+            DistributedPlanWorker worker = assignedJob.getAssignedWorker();
+            TNetworkAddress address = new TNetworkAddress(worker.host(), 
worker.brpcPort());
+            Integer targetIndex = localShuffleTargetIndexMap.get(address);
+            if (targetIndex == null) {
+                localShuffleTargetIndexMap.put(address, targetJobs.size());
+                targetJobs.add(assignedJob);
+                continue;
+            }
+
+            AssignedJob selectedJob = targetJobs.get(targetIndex);
+            if (assignedJob.indexInUnassignedJob() < 
selectedJob.indexInUnassignedJob()) {
+                targetJobs.set(targetIndex, assignedJob);
+            }
+        }
+        return targetJobs;
+    }
+
     private static void collectAllRecursiveCteNodesInRecursiveSide(PlanNode 
planNode, boolean needCollect,
             Set<RecursiveCteNode> recursiveCteNodes) {
         if (planNode instanceof RecursiveCteNode) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
index 88140c0cb2e..62a83172f94 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -17,13 +17,27 @@
 
 package org.apache.doris.qe.runtime;
 
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SortNode;
+import org.apache.doris.thrift.TRecCTETarget;
+import org.apache.doris.thrift.TUniqueId;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 public class ThriftPlansBuilderTest {
     @Test
@@ -36,4 +50,81 @@ public class ThriftPlansBuilderTest {
 
         Mockito.verify(sortNode).setHasRuntimePredicate();
     }
+
+    @Test
+    public void testBuildRecCTETargetsKeepsAllInstancesOnSameBackend() {
+        DistributedPlanWorker worker = 
Mockito.mock(DistributedPlanWorker.class);
+        Mockito.when(worker.host()).thenReturn("127.0.0.1");
+        Mockito.when(worker.brpcPort()).thenReturn(9060);
+
+        TUniqueId firstInstanceId = new TUniqueId(1, 2);
+        TUniqueId secondInstanceId = new TUniqueId(3, 4);
+        AssignedJob firstJob = newAssignedJob(worker, firstInstanceId);
+        AssignedJob secondJob = newAssignedJob(worker, secondInstanceId);
+        RecursiveCteScanNode scanNode = new RecursiveCteScanNode("r", new 
PlanNodeId(11),
+                new TupleDescriptor(new TupleId(12)));
+
+        List<TRecCTETarget> targets = ThriftPlansBuilder.buildRecCTETargets(
+                Arrays.asList(firstJob, secondJob), scanNode);
+
+        Assertions.assertEquals(2, targets.size());
+        Assertions.assertEquals(firstInstanceId, 
targets.get(0).getFragmentInstanceId());
+        Assertions.assertEquals(secondInstanceId, 
targets.get(1).getFragmentInstanceId());
+        Assertions.assertEquals("127.0.0.1", 
targets.get(0).getAddr().getHostname());
+        Assertions.assertEquals(9060, targets.get(0).getAddr().getPort());
+        Assertions.assertEquals(11, targets.get(0).getNodeId());
+        Assertions.assertEquals("127.0.0.1", 
targets.get(1).getAddr().getHostname());
+        Assertions.assertEquals(9060, targets.get(1).getAddr().getPort());
+        Assertions.assertEquals(11, targets.get(1).getNodeId());
+    }
+
+    @Test
+    public void testBuildRecCTETargetsKeepsOneLocalShuffleInstancePerBackend() 
{
+        DistributedPlanWorker firstWorker = newWorker("127.0.0.1", 9060);
+        DistributedPlanWorker secondWorker = newWorker("127.0.0.2", 9060);
+
+        TUniqueId scanInstanceId = new TUniqueId(1, 2);
+        TUniqueId localShuffleInstanceId = new TUniqueId(3, 4);
+        TUniqueId secondWorkerInstanceId = new TUniqueId(5, 6);
+        LocalShuffleAssignedJob scanJob = newLocalShuffleAssignedJob(0, 10, 
firstWorker, scanInstanceId);
+        LocalShuffleAssignedJob localShuffleJob = newLocalShuffleAssignedJob(
+                1, 10, firstWorker, localShuffleInstanceId);
+        LocalShuffleAssignedJob secondWorkerScanJob = 
newLocalShuffleAssignedJob(
+                2, 11, secondWorker, secondWorkerInstanceId);
+        RecursiveCteScanNode scanNode = new RecursiveCteScanNode("r", new 
PlanNodeId(11),
+                new TupleDescriptor(new TupleId(12)));
+
+        List<TRecCTETarget> targets = ThriftPlansBuilder.buildRecCTETargets(
+                Arrays.asList(scanJob, localShuffleJob, secondWorkerScanJob), 
scanNode);
+
+        Assertions.assertEquals(2, targets.size());
+        Assertions.assertEquals(scanInstanceId, 
targets.get(0).getFragmentInstanceId());
+        Assertions.assertEquals(secondWorkerInstanceId, 
targets.get(1).getFragmentInstanceId());
+        Assertions.assertEquals("127.0.0.1", 
targets.get(0).getAddr().getHostname());
+        Assertions.assertEquals(9060, targets.get(0).getAddr().getPort());
+        Assertions.assertEquals(11, targets.get(0).getNodeId());
+        Assertions.assertEquals("127.0.0.2", 
targets.get(1).getAddr().getHostname());
+        Assertions.assertEquals(9060, targets.get(1).getAddr().getPort());
+        Assertions.assertEquals(11, targets.get(1).getNodeId());
+    }
+
+    private AssignedJob newAssignedJob(DistributedPlanWorker worker, TUniqueId 
instanceId) {
+        AssignedJob assignedJob = Mockito.mock(AssignedJob.class);
+        Mockito.when(assignedJob.getAssignedWorker()).thenReturn(worker);
+        Mockito.when(assignedJob.instanceId()).thenReturn(instanceId);
+        return assignedJob;
+    }
+
+    private DistributedPlanWorker newWorker(String host, int brpcPort) {
+        DistributedPlanWorker worker = 
Mockito.mock(DistributedPlanWorker.class);
+        Mockito.when(worker.host()).thenReturn(host);
+        Mockito.when(worker.brpcPort()).thenReturn(brpcPort);
+        return worker;
+    }
+
+    private LocalShuffleAssignedJob newLocalShuffleAssignedJob(int 
indexInUnassignedJob, int shareScanId,
+            DistributedPlanWorker worker, TUniqueId instanceId) {
+        return new LocalShuffleAssignedJob(indexInUnassignedJob, shareScanId, 
instanceId,
+                Mockito.mock(UnassignedJob.class), worker, 
DefaultScanSource.empty());
+    }
 }
diff --git 
a/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy 
b/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy
new file mode 100644
index 00000000000..9ff744104ce
--- /dev/null
+++ b/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("rec_cte_parallel_targets", "rec_cte") {
+    sql """set enable_local_shuffle=true"""
+    sql """set ignore_storage_data_distribution=true"""
+    sql """set parallel_pipeline_task_num=4"""
+
+    sql """drop table if exists rec_cte_parallel_targets"""
+    sql """
+        create table rec_cte_parallel_targets (
+            id int,
+            parent_id int,
+            dept_name varchar(50),
+            budget decimal(18, 2)
+        )
+        duplicate key(id)
+        distributed by hash(id) buckets 4
+        properties ("replication_num" = "1")
+    """
+
+    sql """
+        insert into rec_cte_parallel_targets values
+        (1, null, 'headquarters', 10000.00),
+        (10, 1, 'r_and_d', 5000.00),
+        (11, 1, 'marketing', 4000.00),
+        (101, 10, 'backend', 2000.00),
+        (102, 10, 'frontend', 1500.00),
+        (111, 11, 'promotion', 2000.00)
+    """
+
+    def result = sql """
+        with recursive cte(curr_id, total_score, step_path) as (
+            select
+                id,
+                cast(budget as double),
+                cast(dept_name as char(200))
+            from rec_cte_parallel_targets
+            where parent_id is null
+
+            union all
+
+            select
+                cast(t.id as int),
+                cast(c.total_score + t.budget as double),
+                cast(concat(c.step_path, '->', t.dept_name) as char(200))
+            from rec_cte_parallel_targets t
+            inner join cte c on t.parent_id = c.curr_id
+            where c.total_score < 50000
+        )
+        select curr_id, cast(cast(total_score as decimal(18, 2)) as string), 
step_path
+        from cte
+        order by curr_id
+    """
+
+    assertEquals([
+        [1, "10000.00", "headquarters"],
+        [10, "15000.00", "headquarters->r_and_d"],
+        [11, "14000.00", "headquarters->marketing"],
+        [101, "17000.00", "headquarters->r_and_d->backend"],
+        [102, "16500.00", "headquarters->r_and_d->frontend"],
+        [111, "16000.00", "headquarters->marketing->promotion"]
+    ], result)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to