This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba6a4439eb5 [improvement](fe) Balance runtime filter coordinator 
selection (#64130)
ba6a4439eb5 is described below

commit ba6a4439eb5b5bd7ec47dc75989f368c40dd832f
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 8 15:24:27 2026 +0800

    [improvement](fe) Balance runtime filter coordinator selection (#64130)
    
    Nereids runtime filter merge coordinator selection in
    FE used the first instance of the top-most distributed plan. The merge
    coordinator should stay on a top-most fragment instance so its lifetime
    covers runtime filter merging, but stable plan and worker ordering can
    still make repeated queries choose the same backend. This change
    randomly selects the merge worker from the distinct BEs assigned to the
    top-most distributed plan, so coordinator work is no longer fixed to the
    first top-most instance while preserving the top-most fragment lifetime
    requirement. Legacy Coordinator behavior is unchanged.
---
 .../qe/runtime/RuntimeFiltersThriftBuilder.java    |  29 ++++--
 .../runtime/RuntimeFiltersThriftBuilderTest.java   | 114 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 8 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
index 705445f80db..cd7b0ad7bff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
@@ -29,6 +29,7 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 import java.util.ArrayList;
@@ -36,6 +37,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 /** RuntimeFiltersThriftBuilder */
@@ -43,19 +45,17 @@ public class RuntimeFiltersThriftBuilder {
     public final TNetworkAddress mergeAddress;
 
     private final List<RuntimeFilter> runtimeFilters;
-    private final AssignedJob mergeInstance;
     private final Set<Integer> broadcastRuntimeFilterIds;
     private final Map<RuntimeFilterId, List<RuntimeFilterTarget>> ridToTargets;
     private final Map<RuntimeFilterId, Integer> ridToBuilderNum;
 
     private RuntimeFiltersThriftBuilder(
             TNetworkAddress mergeAddress, List<RuntimeFilter> runtimeFilters,
-            AssignedJob mergeInstance, Set<Integer> broadcastRuntimeFilterIds,
+            Set<Integer> broadcastRuntimeFilterIds,
             Map<RuntimeFilterId, List<RuntimeFilterTarget>> ridToTargets,
             Map<RuntimeFilterId, Integer> ridToBuilderNum) {
         this.mergeAddress = mergeAddress;
         this.runtimeFilters = runtimeFilters;
-        this.mergeInstance = mergeInstance;
         this.broadcastRuntimeFilterIds = broadcastRuntimeFilterIds;
         this.ridToTargets = ridToTargets;
         this.ridToBuilderNum = ridToBuilderNum;
@@ -100,9 +100,7 @@ public class RuntimeFiltersThriftBuilder {
 
     public static RuntimeFiltersThriftBuilder compute(
             List<RuntimeFilter> runtimeFilters, List<PipelineDistributedPlan> 
distributedPlans) {
-        PipelineDistributedPlan topMostPlan = 
distributedPlans.get(distributedPlans.size() - 1);
-        AssignedJob mergeInstance = topMostPlan.getInstanceJobs().get(0);
-        BackendWorker worker = (BackendWorker) 
mergeInstance.getAssignedWorker();
+        BackendWorker worker = selectMergeWorker(distributedPlans);
         TNetworkAddress mergeAddress = new TNetworkAddress(worker.host(), 
worker.brpcPort());
 
         Set<Integer> broadcastRuntimeFilterIds = runtimeFilters
@@ -139,8 +137,23 @@ public class RuntimeFiltersThriftBuilder {
             }
         }
         return new RuntimeFiltersThriftBuilder(
-                mergeAddress, runtimeFilters, mergeInstance,
-                broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum);
+                mergeAddress, runtimeFilters, broadcastRuntimeFilterIds, 
ridToTargetParam, ridToBuilderNum);
+    }
+
+    static BackendWorker selectMergeWorker(List<PipelineDistributedPlan> 
distributedPlans) {
+        List<BackendWorker> workers = 
collectMergeWorkerCandidates(distributedPlans);
+        return 
workers.get(ThreadLocalRandom.current().nextInt(workers.size()));
+    }
+
+    static List<BackendWorker> 
collectMergeWorkerCandidates(List<PipelineDistributedPlan> distributedPlans) {
+        PipelineDistributedPlan topMostPlan = 
distributedPlans.get(distributedPlans.size() - 1);
+        Map<Long, BackendWorker> candidateWorkers = Maps.newLinkedHashMap();
+        for (AssignedJob instanceJob : topMostPlan.getInstanceJobs()) {
+            BackendWorker worker = (BackendWorker) 
instanceJob.getAssignedWorker();
+            candidateWorkers.putIfAbsent(worker.id(), worker);
+        }
+        Preconditions.checkState(!candidateWorkers.isEmpty(), "runtime filter 
merge worker is empty");
+        return new ArrayList<>(candidateWorkers.values());
     }
 
     public static class RuntimeFilterTarget {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilderTest.java
new file mode 100644
index 00000000000..b3d4b4f80ce
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilderTest.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.AbstractTreeNode;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSetMultimap;
+import com.google.common.collect.ListMultimap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class RuntimeFiltersThriftBuilderTest {
+    @Test
+    public void testSelectRuntimeFilterMergeWorkerFromTopMostWorkers() {
+        BackendWorker worker1 = newBackendWorker(1);
+        BackendWorker worker2 = newBackendWorker(2);
+        BackendWorker worker3 = newBackendWorker(3);
+        PipelineDistributedPlan scanPlan = newDistributedPlan(worker1, 
worker2);
+        PipelineDistributedPlan joinPlan = newDistributedPlan(worker1, 
worker3);
+        PipelineDistributedPlan topPlan = newDistributedPlan(worker1, worker3);
+
+        List<PipelineDistributedPlan> distributedPlans = 
Arrays.asList(scanPlan, joinPlan, topPlan);
+        List<BackendWorker> candidates = 
RuntimeFiltersThriftBuilder.collectMergeWorkerCandidates(distributedPlans);
+
+        Assertions.assertEquals(Arrays.asList(worker1, worker3), candidates);
+        
Assertions.assertTrue(candidates.contains(RuntimeFiltersThriftBuilder.selectMergeWorker(distributedPlans)));
+    }
+
+    private BackendWorker newBackendWorker(long id) {
+        Backend backend = new Backend(id, "host" + id, (int) (9000 + id));
+        backend.setBePort((int) (8000 + id));
+        backend.setBrpcPort((int) (7000 + id));
+        return new BackendWorker(0, backend);
+    }
+
+    private PipelineDistributedPlan newDistributedPlan(BackendWorker... 
workers) {
+        TestUnassignedJob unassignedJob = new TestUnassignedJob();
+        List<AssignedJob> assignedJobs = Arrays.stream(workers)
+                .map(worker -> unassignedJob.assignWorkerAndDataSources(
+                        0, new TUniqueId(), worker, DefaultScanSource.empty()))
+                .collect(Collectors.toList());
+        return new PipelineDistributedPlan(unassignedJob, assignedJobs, 
ImmutableSetMultimap.of());
+    }
+
+    private static final class TestUnassignedJob extends 
AbstractTreeNode<UnassignedJob> implements UnassignedJob {
+        private TestUnassignedJob() {
+            super(Collections.emptyList());
+        }
+
+        @Override
+        public StatementContext getStatementContext() {
+            return null;
+        }
+
+        @Override
+        public PlanFragment getFragment() {
+            return null;
+        }
+
+        @Override
+        public List<ScanNode> getScanNodes() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public ListMultimap<ExchangeNode, UnassignedJob> 
getExchangeToChildJob() {
+            return ArrayListMultimap.create();
+        }
+
+        @Override
+        public List<AssignedJob> computeAssignedJobs(
+                DistributeContext distributeContext, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public UnassignedJob withChildren(List<UnassignedJob> children) {
+            return this;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to