This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 953250c4669 branch-4.1: [improve](streaming-job) support specifying 
compute_group for StreamingJob #62747 (#62990)
953250c4669 is described below

commit 953250c46697b1134ee91a51f2c2c06aa11ed0a6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 6 10:43:58 2026 +0800

    branch-4.1: [improve](streaming-job) support specifying compute_group for 
StreamingJob #62747 (#62990)
    
    Cherry-picked from #62747
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java       |  90 +++++++++++-
 .../insert/streaming/StreamingInsertTask.java      |  11 +-
 .../insert/streaming/StreamingJobProperties.java   |   7 +-
 .../insert/streaming/StreamingMultiTblTask.java    |   7 +-
 .../doris/job/offset/SourceOffsetProvider.java     |   6 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  12 +-
 .../apache/doris/job/util/StreamingJobUtils.java   |  28 +++-
 .../test_streaming_mysql_job_compute_group.groovy  | 127 ++++++++++++++++
 .../test_streaming_insert_job_compute_group.groovy | 158 ++++++++++++++++++++
 ...treaming_insert_job_compute_group_docker.groovy | 163 +++++++++++++++++++++
 10 files changed, 584 insertions(+), 25 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 168bb137259..4583271a64b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -20,10 +20,13 @@ package org.apache.doris.job.extensions.insert.streaming;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.qe.ComputeGroupException;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.InternalErrorCode;
@@ -167,6 +170,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     // Converted form of sourceProperties; must be refreshed whenever 
sourceProperties changes.
     private transient Map<String, String> convertedSourceProperties;
 
+    @Getter
+    @SerializedName("ccn")
+    private volatile String cloudCluster;
+
     // The sampling window starts at the beginning of the sampling window.
     // If the error rate exceeds `max_filter_ratio` within the window, the 
sampling fails.
     @Setter
@@ -238,8 +245,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             }
             StreamingJobUtils.resolveAndValidateSource(
                     dataSourceType, sourceProperties, 
String.valueOf(getJobId()), createTbls);
-            this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType,
-                    getConvertedSourceProperties());
+            this.offsetProvider = 
createOffsetProvider(getConvertedSourceProperties());
             JdbcSourceOffsetProvider rdsOffsetProvider = 
(JdbcSourceOffsetProvider) this.offsetProvider;
             rdsOffsetProvider.splitChunks(createTbls);
         } catch (Exception ex) {
@@ -292,6 +298,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             this.jobProperties = new StreamingJobProperties(properties);
             jobProperties.validate();
             this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 * 
1000;
+            resolveCloudCluster();
             // build time definition
             JobExecutionConfiguration execConfig = getJobConfig();
             TimerDefinition timerDefinition = new TimerDefinition();
@@ -305,13 +312,75 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    private void resolveCloudCluster() throws AnalysisException {
+        String requested = validateComputeGroupProperty(properties);
+        if (requested != null) {
+            this.cloudCluster = requested;
+            return;
+        }
+        if (!Config.isCloudMode()) {
+            return;
+        }
+        if (ConnectContext.get() == null) {
+            throw new AnalysisException("compute_group must be specified when 
no active session is available");
+        }
+        String sessionCluster;
+        try {
+            sessionCluster = ConnectContext.get().getCloudCluster();
+        } catch (ComputeGroupException e) {
+            throw new AnalysisException("failed to resolve compute_group: " + 
e.getMessage());
+        }
+        if (StringUtils.isBlank(sessionCluster)) {
+            throw new AnalysisException("compute_group is required in cloud 
mode; "
+                    + "specify compute_group explicitly or bind a default 
cluster with USAGE");
+        }
+        try {
+            ((CloudEnv) 
Env.getCurrentEnv()).checkCloudClusterPriv(sessionCluster);
+        } catch (DdlException e) {
+            throw new AnalysisException(e.getMessage());
+        }
+        this.cloudCluster = sessionCluster;
+    }
+
+    // returns the validated compute_group value, or null when the property is 
absent.
+    // throws if the key is present but blank, non-cloud mode, or the user 
lacks USAGE on the cluster.
+    private String validateComputeGroupProperty(Map<String, String> props) 
throws AnalysisException {
+        if (props == null || 
!props.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
+            return null;
+        }
+        String value = 
props.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
+        if (StringUtils.isBlank(value)) {
+            throw new AnalysisException("compute_group cannot be empty");
+        }
+        if (!Config.isCloudMode()) {
+            throw new AnalysisException("compute_group is only supported in 
cloud mode");
+        }
+        try {
+            ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(value);
+        } catch (DdlException e) {
+            throw new AnalysisException(e.getMessage());
+        }
+        return value;
+    }
+
+    private SourceOffsetProvider createOffsetProvider(Map<String, String> 
jdbcSourceProps) {
+        SourceOffsetProvider provider;
+        if (tvfType != null) {
+            provider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+        } else {
+            provider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType, jdbcSourceProps);
+        }
+        provider.setCloudCluster(this.cloudCluster);
+        return provider;
+    }
+
     private void initInsertJob() {
         try {
             init();
             UnboundTVFRelation currentTvf = getCurrentTvf();
             this.tvfType = currentTvf.getFunctionName();
             this.originTvfProps = currentTvf.getProperties().getMap();
-            this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+            this.offsetProvider = createOffsetProvider(sourceProperties);
             this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
             // Validate source-side resources (e.g. PG slot/publication 
ownership) once at job
             // creation so conflicts fail fast. No-op for standalone 
cdc_stream TVF (no job).
@@ -411,6 +480,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             logParts.add("sql: " + encryptedSql);
         }
 
+        validateComputeGroupProperty(alterJobCommand.getProperties());
+
         // update properties
         if (!alterJobCommand.getProperties().isEmpty()) {
             modifyPropertiesInternal(alterJobCommand.getProperties());
@@ -547,7 +618,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     private AbstractStreamingTask createStreamingMultiTblTask() throws 
JobException {
         return new StreamingMultiTblTask(getJobId(), 
Env.getCurrentEnv().getNextId(), dataSourceType,
                 offsetProvider, getConvertedSourceProperties(), targetDb, 
targetProperties, jobProperties,
-                getCreateUser());
+                getCreateUser(), cloudCluster);
     }
 
     private Map<String, String> getConvertedSourceProperties() throws 
JobException {
@@ -571,7 +642,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     protected AbstractStreamingTask createStreamingInsertTask() {
         return new StreamingInsertTask(getJobId(), 
Env.getCurrentEnv().getNextId(),
                 getExecuteSql(),
-                offsetProvider, getCurrentDbName(), jobProperties, 
getOriginTvfProps(), getCreateUser());
+                offsetProvider, getCurrentDbName(), jobProperties, 
getOriginTvfProps(),
+                getCreateUser(), cloudCluster);
     }
 
     public void recordTasks(AbstractStreamingTask task) {
@@ -856,6 +928,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 resetCloudProgress(offset);
             }
         }
+        if 
(inputProperties.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
+            this.cloudCluster = 
inputProperties.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
+            offsetProvider.setCloudCluster(this.cloudCluster);
+        }
         this.properties.putAll(inputProperties);
         this.jobProperties = new StreamingJobProperties(this.properties);
     }
@@ -1227,11 +1303,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Override
     public void gsonPostProcess() throws IOException {
         if (offsetProvider == null) {
+            offsetProvider = createOffsetProvider(sourceProperties);
             if (tvfType != null) {
-                offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
                 offsetProvider.restoreFromPersistInfo(offsetProviderPersist);
-            } else {
-                offsetProvider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType, sourceProperties);
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 54832eb6fb1..df23f107240 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.Util;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStatusCode;
 
 import lombok.Getter;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,6 +60,7 @@ public class StreamingInsertTask extends 
AbstractStreamingTask {
     private ConnectContext ctx;
     private StreamingJobProperties jobProperties;
     private Map<String, String> originTvfProps;
+    private String cloudCluster;
     SourceOffsetProvider offsetProvider;
 
     public StreamingInsertTask(long jobId,
@@ -67,13 +70,15 @@ public class StreamingInsertTask extends 
AbstractStreamingTask {
                                String currentDb,
                                StreamingJobProperties jobProperties,
                                Map<String, String> originTvfProps,
-                               UserIdentity userIdentity) {
+                               UserIdentity userIdentity,
+                               String cloudCluster) {
         super(jobId, taskId, userIdentity);
         this.sql = sql;
         this.currentDb = currentDb;
         this.offsetProvider = offsetProvider;
         this.jobProperties = jobProperties;
         this.originTvfProps = originTvfProps;
+        this.cloudCluster = cloudCluster;
     }
 
     @Override
@@ -86,6 +91,10 @@ public class StreamingInsertTask extends 
AbstractStreamingTask {
         this.startTimeMs = System.currentTimeMillis();
         ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
         
ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable()));
+        // apply after session merge so compute_group wins over 
session.cloud_cluster
+        if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
+            ctx.setCloudCluster(cloudCluster);
+        }
         StatementContext statementContext = new StatementContext();
         ctx.setStatementContext(statementContext);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index c7a814c4a5b..3db3aecaf8b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -44,8 +44,9 @@ public class StreamingJobProperties implements JobProperties {
     public static final String SESSION_VAR_PREFIX = "session.";
     public static final String INTERNAL_KEY_PREFIX = "__";
     public static final String OFFSET_PROPERTY = "offset";
+    public static final String COMPUTE_GROUP_PROPERTY = "compute_group";
     public static final List<String> SUPPORT_STREAM_JOB_PROPS = 
Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY,
-            S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, 
OFFSET_PROPERTY);
+            S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, 
OFFSET_PROPERTY, COMPUTE_GROUP_PROPERTY);
 
     public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
     public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
@@ -195,4 +196,8 @@ public class StreamingJobProperties implements 
JobProperties {
     public String getOffsetProperty() {
         return properties.get(OFFSET_PROPERTY);
     }
+
+    public String getComputeGroup() {
+        return properties.get(COMPUTE_GROUP_PROPERTY);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index b1d28b2dcad..ac5efe51542 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -75,6 +75,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private Map<String, String> targetProperties;
     private String targetDb;
     private StreamingJobProperties jobProperties;
+    private String cloudCluster;
     private long scannedRows = 0L;
     private long loadBytes = 0L;
     private long filteredRows = 0L;
@@ -90,7 +91,8 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             String targetDb,
             Map<String, String> targetProperties,
             StreamingJobProperties jobProperties,
-            UserIdentity userIdentity) {
+            UserIdentity userIdentity,
+            String cloudCluster) {
         super(jobId, taskId, userIdentity);
         this.dataSourceType = dataSourceType;
         this.offsetProvider = offsetProvider;
@@ -98,6 +100,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         this.targetProperties = targetProperties;
         this.jobProperties = jobProperties;
         this.targetDb = targetDb;
+        this.cloudCluster = cloudCluster;
         this.timeoutMs = Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L;
     }
 
@@ -123,7 +126,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     }
 
     private void sendWriteRequest() throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         log.info("start to run streaming multi task {} in backend {}/{}, 
offset is {}",
                 taskId, backend.getId(), backend.getHost(), 
runningOffset.toString());
         this.runningBackendId = backend.getId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 49c718c5950..87eca253c46 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -88,6 +88,12 @@ public interface SourceOffsetProvider {
      */
     void updateOffset(Offset offset);
 
+    /**
+     * Bind the compute group that should route FE-initiated RPCs.
+     * Default: no-op for providers that do not make BE RPCs.
+     */
+    default void setCloudCluster(String cloudCluster) {}
+
     /**
      * Fetch remote meta information, such as listing files in S3 or getting 
latest offsets in Kafka.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index cfa2fe3273b..3209b190538 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -94,6 +94,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     volatile boolean hasMoreData = true;
 
+    transient volatile String cloudCluster;
+
     /**
      * No-arg constructor for subclass use.
      */
@@ -220,7 +222,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         JobBaseConfig requestParams =
                 new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -306,7 +308,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     private boolean compareOffset(Map<String, String> offsetFirst, Map<String, 
String> offsetSecond)
             throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         CompareOffsetRequest requestParams =
                 new CompareOffsetRequest(getJobId(), sourceType.name(), 
sourceProperties,
                         getFrontendAddress(), offsetFirst, offsetSecond);
@@ -549,7 +551,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     private List<SnapshotSplit> requestTableSplits(String table) throws 
JobException {
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         FetchTableSplitsRequest requestParams =
                 new FetchTableSplitsRequest(getJobId(), sourceType.name(),
                         sourceProperties, getFrontendAddress(), table);
@@ -664,7 +666,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
      * otherwise, conflicts will occur in multi-backends scenarios.
      */
     private void initSourceReader() throws JobException {
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         JobBaseConfig requestParams =
                 new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -712,7 +714,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     public void cleanMeta(Long jobId) throws JobException {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
-        Backend backend = StreamingJobUtils.selectBackend();
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
         JobBaseConfig requestParams =
                 new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 438fb294179..56c95596991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -25,6 +25,8 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.SmallFileMgr;
@@ -227,19 +229,29 @@ public class StreamingJobUtils {
         return JdbcClient.createJdbcClient(config);
     }
 
-    public static Backend selectBackend() throws JobException {
-        Backend backend = null;
-        BeSelectionPolicy policy = null;
+    public static Backend selectBackend(String cloudCluster) throws 
JobException {
+        if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
+            List<Backend> bes = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                    .getBackendsByClusterName(cloudCluster)
+                    .stream()
+                    .filter(Backend::isLoadAvailable)
+                    .collect(Collectors.toList());
+            if (bes.isEmpty()) {
+                throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
+                        + ", compute_group: " + cloudCluster);
+            }
+            int idx = getLastSelectedBackendIndexAndUpdate();
+            return bes.get(Math.floorMod(idx, bes.size()));
+        }
 
-        policy = new 
BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
+                .setEnableRoundRobin(true).needLoadAvailable().build();
         policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
-
-        List<Long> backendIds;
-        backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        List<Long> backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
         if (backendIds.isEmpty()) {
             throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
-        backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
         if (backend == null) {
             throw new 
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
new file mode 100644
index 00000000000..7fee4e14f17
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Coverage for the non-TVF (source-to-target) path: verifies compute_group is
+// rejected in non-cloud mode, persisted on CREATE JOB ... FROM MYSQL in cloud
+// mode, and that the bound job runs end-to-end, exercising 
JdbcSourceOffsetProvider
+// RPCs and StreamingMultiTblTask.sendWriteRequest. Lifecycle checks (empty /
+// invalid / ALTER / PAUSE) are covered by 
test_streaming_insert_job_compute_group.
+suite("test_streaming_mysql_job_compute_group",
+        "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") 
{
+    def jobName = "test_streaming_mysql_cg_job"
+    def currentDb = (sql "select database()")[0][0]
+    def tableName = "mysql_cg_normal1"
+    def mysqlDb = "test_cdc_cg_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableName} force"""
+
+    // Non-cloud mode: compute_group must be rejected regardless of MySQL 
availability
+    if (!isCloudMode()) {
+        test {
+            sql """CREATE JOB ${jobName}
+                    PROPERTIES ("compute_group" = "any_group")
+                    ON STREAMING
+                    FROM MYSQL (
+                        "jdbc_url" = "jdbc:mysql://127.0.0.1:3316",
+                        "driver_url" = "nop",
+                        "driver_class" = "com.mysql.cj.jdbc.Driver",
+                        "user" = "root",
+                        "password" = "nop",
+                        "database" = "${mysqlDb}",
+                        "include_tables" = "${tableName}"
+                    )
+                    TO DATABASE ${currentDb} (
+                      "table.create.properties.replication_num" = "1"
+                    )
+                """
+            exception "only supported in cloud mode"
+        }
+        return
+    }
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        return
+    }
+
+    String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String s3_endpoint = getS3Endpoint()
+    String bucket = getS3BucketName()
+    String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+    def clusterRows = sql "show clusters"
+    assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster"
+    def cg = clusterRows.get(0).get(0)
+
+    connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+        sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+        sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableName}"""
+        sql """CREATE TABLE ${mysqlDb}.${tableName} (
+              `name` varchar(200) NOT NULL,
+              `age` int DEFAULT NULL,
+              PRIMARY KEY (`name`)
+            ) ENGINE=InnoDB"""
+        sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('A1', 
1)"""
+        sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('B1', 
2)"""
+    }
+
+    try {
+        sql """CREATE JOB ${jobName}
+                PROPERTIES ("compute_group" = "${cg}")
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${tableName}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        def props = sql """select properties from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'"""
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+            })
+        } catch (Exception ex) {
+            log.info("job: " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("task: " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        def rows = (sql """SELECT count(*) FROM 
${currentDb}.${tableName}""").get(0).get(0) as long
+        assertTrue(rows >= 2, "expected snapshot rows in target table")
+    } finally {
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${tableName} force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
new file mode 100644
index 00000000000..66168a66f7a
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_compute_group") {
+    def tableName = "test_streaming_insert_job_cg_tbl"
+    def jobName = "test_streaming_insert_job_cg_job"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    def s3Source = """
+        SELECT * FROM S3(
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        )
+    """
+
+    // ---------- Non-cloud mode: compute_group must be rejected ----------
+    if (!isCloudMode()) {
+        test {
+            sql """
+                CREATE JOB ${jobName}
+                PROPERTIES ("compute_group" = "any_group")
+                ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+            """
+            exception "only supported in cloud mode"
+        }
+        return
+    }
+
+    // ---------- Cloud mode ----------
+    def clusterRows = sql "show clusters"
+    assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster"
+    def cg = clusterRows.get(0).get(0)
+
+    try {
+        // 0) Empty compute_group -> CREATE rejected
+        test {
+            sql """
+                CREATE JOB ${jobName}
+                PROPERTIES ("compute_group" = "")
+                ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+            """
+            exception "compute_group cannot be empty"
+        }
+
+        // 1) Invalid compute_group -> CREATE should fail
+        test {
+            sql """
+                CREATE JOB ${jobName}
+                PROPERTIES (
+                    "s3.max_batch_files" = "1",
+                    "compute_group" = "__not_exist_cg__"
+                )
+                ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+            """
+            exception "not exist"
+        }
+
+        // 2) Valid compute_group -> CREATE succeeds; properties reflect it
+        sql """
+            CREATE JOB ${jobName}
+            PROPERTIES (
+                "s3.max_batch_files" = "1",
+                "compute_group" = "${cg}"
+            )
+            ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+        """
+
+        def props = sql """select properties from jobs("type"="insert") where 
Name='${jobName}'"""
+        log.info("job properties: " + props)
+        assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+        // Wait for at least one successful task so the cluster binding is 
exercised end-to-end
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'"""
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+            })
+        } catch (Exception ex) {
+            log.info("job: " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("task: " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // 3) ALTER without PAUSE -> rejected by upstream guard (Only PAUSED 
job can be altered)
+        test {
+            sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = 
"${cg}")"""
+            exception "Only PAUSED job can be altered"
+        }
+
+        sql """PAUSE JOB where jobname = '${jobName}'"""
+        Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+            def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+            s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+        })
+
+        // 4) ALTER to non-existent cluster -> rejected; state + bound cg 
unchanged
+        test {
+            sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = 
"__not_exist_cg__")"""
+            exception "not exist"
+        }
+        def afterBadAlter = sql """select status, properties from 
jobs("type"="insert") where Name='${jobName}'"""
+        assert afterBadAlter.get(0).get(0) == "PAUSED"
+        assert 
afterBadAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"")
+
+        // 5) ALTER with empty compute_group -> rejected; bound cg unchanged
+        test {
+            sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "")"""
+            exception "compute_group cannot be empty"
+        }
+        def afterEmptyAlter = sql """select properties from 
jobs("type"="insert") where Name='${jobName}'"""
+        assert 
afterEmptyAlter.get(0).get(0).contains("\"compute_group\":\"${cg}\"")
+
+        // 6) ALTER to the same valid cluster -> succeeds, properties updated
+        sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "${cg}")"""
+        def afterAlter = sql """select status, properties from 
jobs("type"="insert") where Name='${jobName}'"""
+        assert afterAlter.get(0).get(0) == "PAUSED"
+        assert afterAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"")
+    } finally {
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists `${tableName}` force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
new file mode 100644
index 00000000000..70d99c4483f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Docker-mode coverage for compute_group routing on StreamingInsertJob:
+// spins up cloud cluster with two compute groups and asserts that the bound
+// compute_group actually steers BE traffic. Complements the non-docker suite
+// test_streaming_insert_job_compute_group which covers property lifecycle 
only.
+suite("test_streaming_insert_job_compute_group_docker", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+
+    def tableName = "test_sij_cg_docker_tbl"
+    def jobName = "test_sij_cg_docker_job"
+    def cgA = "compute_cluster"
+    def cgB = "sij_cg_b_docker"
+
+    docker(options) {
+        // default BE (index 0) lives in ${cgA}; the BE added below joins 
${cgB}.
+        cluster.addBackend(1, cgB)
+        Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({
+            def cgs = sql """SHOW COMPUTE GROUPS"""
+            cgs.size() == 2
+        })
+        log.info("compute groups: ${sql """SHOW COMPUTE GROUPS"""}")
+
+        def backends = cluster.getAllBackends().sort { it.backendId }
+        assertEquals(2, backends.size())
+        def beA = backends.get(0)
+        def beB = backends.get(1)
+        log.info("beA=${beA.host}:${beA.httpPort} (${cgA}) 
beB=${beB.host}:${beB.httpPort} (${cgB})")
+
+        sql """drop table if exists `${tableName}` force"""
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int NULL,
+                `c2` string NULL,
+                `c3` int NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`)
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3;
+        """
+
+        def s3Source = """
+            SELECT * FROM S3(
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            )
+        """
+
+        try {
+            def aBefore = get_be_metric(beA.host, beA.httpPort, "load_rows") 
as long
+            def bBefore = get_be_metric(beB.host, beB.httpPort, "load_rows") 
as long
+            log.info("phase0 a=${aBefore} b=${bBefore}")
+
+            // Phase 1: bind to cgA, verify traffic stays on cgA
+            sql """
+                CREATE JOB ${jobName}
+                PROPERTIES (
+                    "s3.max_batch_files" = "1",
+                    "compute_group" = "${cgA}"
+                )
+                ON STREAMING DO INSERT INTO ${tableName} ${s3Source};
+            """
+
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'"""
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 2
+            })
+
+            def aAfter1 = get_be_metric(beA.host, beA.httpPort, "load_rows") 
as long
+            def bAfter1 = get_be_metric(beB.host, beB.httpPort, "load_rows") 
as long
+            log.info("phase1 a=${aAfter1} b=${bAfter1}")
+            assertTrue(aAfter1 > aBefore, "phase1 expects cgA load_rows to 
increase")
+            assertTrue(bAfter1 == bBefore, "phase1 expects cgB untouched")
+            def rows1 = (sql """SELECT count(*) FROM 
${tableName}""").get(0).get(0) as long
+            assertTrue(rows1 > 0, "phase1 expects target table to receive 
rows")
+
+            // Phase 2: ALTER compute_group to cgB with reset offset, verify 
traffic switches
+            sql """PAUSE JOB where jobname = '${jobName}'"""
+            Awaitility.await().atMost(30, SECONDS).pollInterval(1, 
SECONDS).until({
+                def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+                s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+            })
+
+            sql """
+                ALTER JOB ${jobName} PROPERTIES (
+                    "compute_group" = "${cgB}",
+                    "offset" = 
'{"fileName":"regression/load/data/anoexist1234.csv"}'
+                )
+            """
+            sql """RESUME JOB where jobname = '${jobName}'"""
+
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'"""
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 4
+            })
+
+            def aAfter2 = get_be_metric(beA.host, beA.httpPort, "load_rows") 
as long
+            def bAfter2 = get_be_metric(beB.host, beB.httpPort, "load_rows") 
as long
+            log.info("phase2 a=${aAfter2} b=${bAfter2}")
+            assertTrue(aAfter2 == aAfter1, "phase2 expects cgA unchanged")
+            assertTrue(bAfter2 > bAfter1, "phase2 expects cgB load_rows to 
increase")
+
+            // Phase 3: compute_group=cgA plus session.cloud_cluster=cgB; 
compute_group must win
+            sql """PAUSE JOB where jobname = '${jobName}'"""
+            Awaitility.await().atMost(30, SECONDS).pollInterval(1, 
SECONDS).until({
+                def s = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+                s.size() == 1 && 'PAUSED' == s.get(0).get(0)
+            })
+
+            sql """
+                ALTER JOB ${jobName} PROPERTIES (
+                    "compute_group" = "${cgA}",
+                    "session.cloud_cluster" = "${cgB}",
+                    "offset" = 
'{"fileName":"regression/load/data/anoexist56789.csv"}'
+                )
+            """
+            sql """RESUME JOB where jobname = '${jobName}'"""
+
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}'"""
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 6
+            })
+
+            def aAfter3 = get_be_metric(beA.host, beA.httpPort, "load_rows") 
as long
+            def bAfter3 = get_be_metric(beB.host, beB.httpPort, "load_rows") 
as long
+            log.info("phase3 a=${aAfter3} b=${bAfter3}")
+            assertTrue(aAfter3 > aAfter2, "phase3 expects cgA to increase 
(compute_group wins)")
+            assertTrue(bAfter3 == bAfter2, "phase3 expects cgB untouched")
+        } finally {
+            sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+            sql """drop table if exists `${tableName}` force"""
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to