This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 5a83226b105 branch-4.1: [fix](fe) Send recursive CTE blocks to every
scan instance #64964 (#65008)
5a83226b105 is described below
commit 5a83226b1050705532a0bbde0c05bd0e02b5885b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 1 17:24:20 2026 +0800
branch-4.1: [fix](fe) Send recursive CTE blocks to every scan instance
#64964 (#65008)
Cherry-picked from #64964
---------
Co-authored-by: Pxl <[email protected]>
---
.../doris/qe/runtime/ThriftPlansBuilder.java | 58 ++++++++++----
.../doris/qe/runtime/ThriftPlansBuilderTest.java | 91 ++++++++++++++++++++++
.../recursive_cte/rec_cte_parallel_targets.groovy | 78 +++++++++++++++++++
3 files changed, 213 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 2ef71ab66dc..eba3343bd03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -70,7 +70,6 @@ import org.apache.doris.thrift.TRuntimeFilterInfo;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TTopnFilterDesc;
-import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Suppliers;
import com.google.common.collect.ArrayListMultimap;
@@ -698,15 +697,12 @@ public class ThriftPlansBuilder {
// collect all assigned instance network addresses for this
fragment
List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs();
Set<TNetworkAddress> networkAddresses = new TreeSet<>();
- Map<TNetworkAddress, TUniqueId> addressTUniqueIdMap = new
TreeMap<>();
for (AssignedJob assignedJob : fragmentAssignedJobs) {
DistributedPlanWorker distributedPlanWorker =
assignedJob.getAssignedWorker();
// use brpc port + host as the address used by BE for
control/reset
TNetworkAddress networkAddress = new
TNetworkAddress(distributedPlanWorker.host(),
distributedPlanWorker.brpcPort());
- if (networkAddresses.add(networkAddress)) {
- addressTUniqueIdMap.put(networkAddress,
assignedJob.instanceId());
- }
+ networkAddresses.add(networkAddress);
}
PlanFragment planFragment = plan.getFragmentJob().getFragment();
// remember addresses for later when building reset infos
@@ -727,15 +723,8 @@ public class ThriftPlansBuilder {
throw new IllegalStateException(
"fragmentAssignedJobs is empty for recursive cte
scan node");
}
- // Build a TRecCTETargets
- List<TRecCTETarget> recCTETargets = new
ArrayList<>(addressTUniqueIdMap.size());
- for (Entry<TNetworkAddress, TUniqueId> entry :
addressTUniqueIdMap.entrySet()) {
- TRecCTETarget tRecCTETarget = new TRecCTETarget();
- tRecCTETarget.setAddr(entry.getKey());
- tRecCTETarget.setFragmentInstanceId(entry.getValue());
-
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
- recCTETargets.add(tRecCTETarget);
- }
+ List<TRecCTETarget> recCTETargets = buildRecCTETargets(
+ fragmentAssignedJobs, recursiveCteScanNodes.get(0));
// store the target for producers to reference later
fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(),
recCTETargets);
}
@@ -819,6 +808,47 @@ public class ThriftPlansBuilder {
return fragmentToNotifyClose;
}
+ static List<TRecCTETarget> buildRecCTETargets(List<AssignedJob>
fragmentAssignedJobs,
+ RecursiveCteScanNode recursiveCteScanNode) {
+ List<AssignedJob> targetJobs =
selectRecCTETargetJobs(fragmentAssignedJobs);
+ List<TRecCTETarget> targets = new ArrayList<>(targetJobs.size());
+ for (AssignedJob assignedJob : targetJobs) {
+ DistributedPlanWorker worker = assignedJob.getAssignedWorker();
+ TRecCTETarget target = new TRecCTETarget();
+ target.setAddr(new TNetworkAddress(worker.host(),
worker.brpcPort()));
+ target.setFragmentInstanceId(assignedJob.instanceId());
+ target.setNodeId(recursiveCteScanNode.getId().asInt());
+ targets.add(target);
+ }
+ return targets;
+ }
+
+ static List<AssignedJob> selectRecCTETargetJobs(List<AssignedJob>
fragmentAssignedJobs) {
+ List<AssignedJob> targetJobs = new
ArrayList<>(fragmentAssignedJobs.size());
+ Map<TNetworkAddress, Integer> localShuffleTargetIndexMap =
Maps.newLinkedHashMap();
+ for (AssignedJob assignedJob : fragmentAssignedJobs) {
+ if (!(assignedJob instanceof LocalShuffleAssignedJob)) {
+ targetJobs.add(assignedJob);
+ continue;
+ }
+
+ DistributedPlanWorker worker = assignedJob.getAssignedWorker();
+ TNetworkAddress address = new TNetworkAddress(worker.host(),
worker.brpcPort());
+ Integer targetIndex = localShuffleTargetIndexMap.get(address);
+ if (targetIndex == null) {
+ localShuffleTargetIndexMap.put(address, targetJobs.size());
+ targetJobs.add(assignedJob);
+ continue;
+ }
+
+ AssignedJob selectedJob = targetJobs.get(targetIndex);
+ if (assignedJob.indexInUnassignedJob() <
selectedJob.indexInUnassignedJob()) {
+ targetJobs.set(targetIndex, assignedJob);
+ }
+ }
+ return targetJobs;
+ }
+
private static void collectAllRecursiveCteNodesInRecursiveSide(PlanNode
planNode, boolean needCollect,
Set<RecursiveCteNode> recursiveCteNodes) {
if (planNode instanceof RecursiveCteNode) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
index 88140c0cb2e..62a83172f94 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -17,13 +17,27 @@
package org.apache.doris.qe.runtime;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.RecursiveCteScanNode;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SortNode;
+import org.apache.doris.thrift.TRecCTETarget;
+import org.apache.doris.thrift.TUniqueId;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
public class ThriftPlansBuilderTest {
@Test
@@ -36,4 +50,81 @@ public class ThriftPlansBuilderTest {
Mockito.verify(sortNode).setHasRuntimePredicate();
}
+
+ @Test
+ public void testBuildRecCTETargetsKeepsAllInstancesOnSameBackend() {
+ DistributedPlanWorker worker =
Mockito.mock(DistributedPlanWorker.class);
+ Mockito.when(worker.host()).thenReturn("127.0.0.1");
+ Mockito.when(worker.brpcPort()).thenReturn(9060);
+
+ TUniqueId firstInstanceId = new TUniqueId(1, 2);
+ TUniqueId secondInstanceId = new TUniqueId(3, 4);
+ AssignedJob firstJob = newAssignedJob(worker, firstInstanceId);
+ AssignedJob secondJob = newAssignedJob(worker, secondInstanceId);
+ RecursiveCteScanNode scanNode = new RecursiveCteScanNode("r", new
PlanNodeId(11),
+ new TupleDescriptor(new TupleId(12)));
+
+ List<TRecCTETarget> targets = ThriftPlansBuilder.buildRecCTETargets(
+ Arrays.asList(firstJob, secondJob), scanNode);
+
+ Assertions.assertEquals(2, targets.size());
+ Assertions.assertEquals(firstInstanceId,
targets.get(0).getFragmentInstanceId());
+ Assertions.assertEquals(secondInstanceId,
targets.get(1).getFragmentInstanceId());
+ Assertions.assertEquals("127.0.0.1",
targets.get(0).getAddr().getHostname());
+ Assertions.assertEquals(9060, targets.get(0).getAddr().getPort());
+ Assertions.assertEquals(11, targets.get(0).getNodeId());
+ Assertions.assertEquals("127.0.0.1",
targets.get(1).getAddr().getHostname());
+ Assertions.assertEquals(9060, targets.get(1).getAddr().getPort());
+ Assertions.assertEquals(11, targets.get(1).getNodeId());
+ }
+
+ @Test
+ public void testBuildRecCTETargetsKeepsOneLocalShuffleInstancePerBackend()
{
+ DistributedPlanWorker firstWorker = newWorker("127.0.0.1", 9060);
+ DistributedPlanWorker secondWorker = newWorker("127.0.0.2", 9060);
+
+ TUniqueId scanInstanceId = new TUniqueId(1, 2);
+ TUniqueId localShuffleInstanceId = new TUniqueId(3, 4);
+ TUniqueId secondWorkerInstanceId = new TUniqueId(5, 6);
+ LocalShuffleAssignedJob scanJob = newLocalShuffleAssignedJob(0, 10,
firstWorker, scanInstanceId);
+ LocalShuffleAssignedJob localShuffleJob = newLocalShuffleAssignedJob(
+ 1, 10, firstWorker, localShuffleInstanceId);
+ LocalShuffleAssignedJob secondWorkerScanJob =
newLocalShuffleAssignedJob(
+ 2, 11, secondWorker, secondWorkerInstanceId);
+ RecursiveCteScanNode scanNode = new RecursiveCteScanNode("r", new
PlanNodeId(11),
+ new TupleDescriptor(new TupleId(12)));
+
+ List<TRecCTETarget> targets = ThriftPlansBuilder.buildRecCTETargets(
+ Arrays.asList(scanJob, localShuffleJob, secondWorkerScanJob),
scanNode);
+
+ Assertions.assertEquals(2, targets.size());
+ Assertions.assertEquals(scanInstanceId,
targets.get(0).getFragmentInstanceId());
+ Assertions.assertEquals(secondWorkerInstanceId,
targets.get(1).getFragmentInstanceId());
+ Assertions.assertEquals("127.0.0.1",
targets.get(0).getAddr().getHostname());
+ Assertions.assertEquals(9060, targets.get(0).getAddr().getPort());
+ Assertions.assertEquals(11, targets.get(0).getNodeId());
+ Assertions.assertEquals("127.0.0.2",
targets.get(1).getAddr().getHostname());
+ Assertions.assertEquals(9060, targets.get(1).getAddr().getPort());
+ Assertions.assertEquals(11, targets.get(1).getNodeId());
+ }
+
+ private AssignedJob newAssignedJob(DistributedPlanWorker worker, TUniqueId
instanceId) {
+ AssignedJob assignedJob = Mockito.mock(AssignedJob.class);
+ Mockito.when(assignedJob.getAssignedWorker()).thenReturn(worker);
+ Mockito.when(assignedJob.instanceId()).thenReturn(instanceId);
+ return assignedJob;
+ }
+
+ private DistributedPlanWorker newWorker(String host, int brpcPort) {
+ DistributedPlanWorker worker =
Mockito.mock(DistributedPlanWorker.class);
+ Mockito.when(worker.host()).thenReturn(host);
+ Mockito.when(worker.brpcPort()).thenReturn(brpcPort);
+ return worker;
+ }
+
+ private LocalShuffleAssignedJob newLocalShuffleAssignedJob(int
indexInUnassignedJob, int shareScanId,
+ DistributedPlanWorker worker, TUniqueId instanceId) {
+ return new LocalShuffleAssignedJob(indexInUnassignedJob, shareScanId,
instanceId,
+ Mockito.mock(UnassignedJob.class), worker,
DefaultScanSource.empty());
+ }
}
diff --git
a/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy
b/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy
new file mode 100644
index 00000000000..9ff744104ce
--- /dev/null
+++ b/regression-test/suites/recursive_cte/rec_cte_parallel_targets.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("rec_cte_parallel_targets", "rec_cte") {
+ sql """set enable_local_shuffle=true"""
+ sql """set ignore_storage_data_distribution=true"""
+ sql """set parallel_pipeline_task_num=4"""
+
+ sql """drop table if exists rec_cte_parallel_targets"""
+ sql """
+ create table rec_cte_parallel_targets (
+ id int,
+ parent_id int,
+ dept_name varchar(50),
+ budget decimal(18, 2)
+ )
+ duplicate key(id)
+ distributed by hash(id) buckets 4
+ properties ("replication_num" = "1")
+ """
+
+ sql """
+ insert into rec_cte_parallel_targets values
+ (1, null, 'headquarters', 10000.00),
+ (10, 1, 'r_and_d', 5000.00),
+ (11, 1, 'marketing', 4000.00),
+ (101, 10, 'backend', 2000.00),
+ (102, 10, 'frontend', 1500.00),
+ (111, 11, 'promotion', 2000.00)
+ """
+
+ def result = sql """
+ with recursive cte(curr_id, total_score, step_path) as (
+ select
+ id,
+ cast(budget as double),
+ cast(dept_name as char(200))
+ from rec_cte_parallel_targets
+ where parent_id is null
+
+ union all
+
+ select
+ cast(t.id as int),
+ cast(c.total_score + t.budget as double),
+ cast(concat(c.step_path, '->', t.dept_name) as char(200))
+ from rec_cte_parallel_targets t
+ inner join cte c on t.parent_id = c.curr_id
+ where c.total_score < 50000
+ )
+ select curr_id, cast(cast(total_score as decimal(18, 2)) as string),
step_path
+ from cte
+ order by curr_id
+ """
+
+ assertEquals([
+ [1, "10000.00", "headquarters"],
+ [10, "15000.00", "headquarters->r_and_d"],
+ [11, "14000.00", "headquarters->marketing"],
+ [101, "17000.00", "headquarters->r_and_d->backend"],
+ [102, "16500.00", "headquarters->r_and_d->frontend"],
+ [111, "16000.00", "headquarters->marketing->promotion"]
+ ], result)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]