This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a789c9941e8 [fix](job-manager) cancelTaskById should not be blocked by 
unrelated streaming jobs (#62940)
a789c9941e8 is described below

commit a789c9941e847e7945e9849930b3dd4cf58badc6
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:11:27 2026 +0800

    [fix](job-manager) cancelTaskById should not be blocked by unrelated 
streaming jobs (#62940)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    `JobManager.cancelTaskById` rejects all `CANCEL TASK` calls when any
    streaming job exists in `jobMap`. The streaming-type check is placed
    before the `jobName` match, so it fires on every iteration:
    
    ```java
    for (T job : jobMap.values()) {
        if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
            throw new JobException("streaming job not support cancel task by 
id");
        }
        if (job.getJobName().equals(jobName)) { ... }
    }
    ```
    
    If a single streaming job exists in `jobMap`, `CANCEL TASK FOR
    <any-non-streaming-job>` throws "streaming job not support cancel task
    by id" before ever matching the actual target.
    
    ### Fix
    
    Move the streaming-type check inside the `jobName` match, so it only
    fires when the matched job is actually a streaming job.
    
    ### Release note
    
    Fix `CANCEL TASK` on non-streaming jobs incorrectly rejected when an
    unrelated streaming job exists in the job map.
---
 .../org/apache/doris/job/manager/JobManager.java   |  6 ++--
 .../apache/doris/job/manager/JobManagerTest.java   | 39 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 9cc2857a754..23f51890da5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -424,10 +424,10 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
      */
     public void cancelTaskById(String jobName, Long taskId) throws 
JobException {
         for (T job : jobMap.values()) {
-            if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
-                throw new JobException("streaming job not support cancel task 
by id");
-            }
             if (job.getJobName().equals(jobName)) {
+                if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+                    throw new JobException("streaming job not support cancel 
task by id");
+                }
                 job.cancelTaskById(taskId);
                 job.logUpdateOperation();
                 return;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
index 8a0f8f20305..0e2c07fd072 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
@@ -19,6 +19,11 @@ package org.apache.doris.job.manager;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.exception.JobException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -30,6 +35,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
 
 public class JobManagerTest {
     @Test
@@ -60,4 +66,37 @@ public class JobManagerTest {
             }
         }
     }
+
+    private static AbstractJob mockJob(long id, String name, JobExecuteType 
type) {
+        AbstractJob job = Mockito.mock(AbstractJob.class);
+        Mockito.when(job.getJobId()).thenReturn(id);
+        Mockito.when(job.getJobName()).thenReturn(name);
+        JobExecutionConfiguration cfg = new JobExecutionConfiguration();
+        cfg.setExecuteType(type);
+        Mockito.when(job.getJobConfig()).thenReturn(cfg);
+        return job;
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testCancelTaskByIdNotBlockedByOtherStreamingJob() throws 
JobException {
+        JobManager manager = new JobManager();
+        AbstractJob streamingJob = mockJob(1L, "streaming_job", 
JobExecuteType.STREAMING);
+        AbstractJob batchJob = mockJob(2L, "batch_job", 
JobExecuteType.RECURRING);
+        Map<Long, AbstractJob> jobMap = (Map<Long, AbstractJob>) 
Deencapsulation.getField(manager, "jobMap");
+        jobMap.put(1L, streamingJob);
+        jobMap.put(2L, batchJob);
+
+        // Cancelling the batch job must not be blocked by the unrelated 
streaming job in jobMap.
+        manager.cancelTaskById("batch_job", 100L);
+        Mockito.verify(batchJob).cancelTaskById(100L);
+
+        // Cancelling the streaming job itself still rejected.
+        try {
+            manager.cancelTaskById("streaming_job", 100L);
+            Assert.fail("expected JobException for streaming job");
+        } catch (JobException e) {
+            Assert.assertTrue(e.getMessage().contains("streaming job not 
support"));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to