This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e706d1a076 [fix](job) fix StreamingInsertJob incorrectly counting
canceled tasks (#61894)
8e706d1a076 is described below
commit 8e706d1a076084c027cf8470b93dd0afd8b378ce
Author: wudi <[email protected]>
AuthorDate: Thu Apr 2 19:54:42 2026 +0800
[fix](job) fix StreamingInsertJob incorrectly counting canceled tasks
(#61894)
## What problem does this PR solve?
Problem Summary:
- `StreamingInsertJob` does not support manual task cancellation
- `cancelAllTasks()` is only triggered internally when job is STOPPED or
PAUSED, meaning the task was interrupted passively
- Change `canceledTaskCount.incrementAndGet()` in `cancelAllTasks()`,
when task is running `canceledTaskCount` need incr
---
.../job/extensions/insert/streaming/StreamingInsertJob.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 29f9a17e2a9..d7a325e22f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -439,8 +439,15 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (runningStreamTask == null) {
return;
}
+ // Check status before cancel: if the task was still active
(RUNNING or PENDING),
+ // count it as canceled. If already in a terminal state (e.g.
FAILED), it was
+ // already counted by onStreamTaskFail(), so skip to avoid
double-counting.
+ boolean wasActive =
TaskStatus.RUNNING.equals(runningStreamTask.getStatus())
+ ||
TaskStatus.PENDING.equals(runningStreamTask.getStatus());
runningStreamTask.cancel(needWaitCancelComplete);
- canceledTaskCount.incrementAndGet();
+ if (wasActive) {
+ canceledTaskCount.incrementAndGet();
+ }
} finally {
lock.writeLock().unlock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]