This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e706d1a076 [fix](job) fix StreamingInsertJob incorrectly counting 
canceled tasks (#61894)
8e706d1a076 is described below

commit 8e706d1a076084c027cf8470b93dd0afd8b378ce
Author: wudi <[email protected]>
AuthorDate: Thu Apr 2 19:54:42 2026 +0800

    [fix](job) fix StreamingInsertJob incorrectly counting canceled tasks 
(#61894)
    
    ## What problem does this PR solve?
    Problem Summary:
    - `StreamingInsertJob` does not support manual task cancellation
    - `cancelAllTasks()` is only triggered internally when job is STOPPED or
    PAUSED, meaning the task was interrupted passively
    - Change `canceledTaskCount.incrementAndGet()` in `cancelAllTasks()`,
    when task is running `canceledTaskCount` need incr
---
 .../job/extensions/insert/streaming/StreamingInsertJob.java      | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 29f9a17e2a9..d7a325e22f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -439,8 +439,15 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             if (runningStreamTask == null) {
                 return;
             }
+            // Check status before cancel: if the task was still active 
(RUNNING or PENDING),
+            // count it as canceled. If already in a terminal state (e.g. 
FAILED), it was
+            // already counted by onStreamTaskFail(), so skip to avoid 
double-counting.
+            boolean wasActive = 
TaskStatus.RUNNING.equals(runningStreamTask.getStatus())
+                    || 
TaskStatus.PENDING.equals(runningStreamTask.getStatus());
             runningStreamTask.cancel(needWaitCancelComplete);
-            canceledTaskCount.incrementAndGet();
+            if (wasActive) {
+                canceledTaskCount.incrementAndGet();
+            }
         } finally {
             lock.writeLock().unlock();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to