This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 4760f093948 branch-4.0: [fix](streaming-job) Fix NPE in 
StreamingInsertJob.replayOnCommitted during EditLog replay #62416 (#62629)
4760f093948 is described below

commit 4760f093948da930bb03fdec9058ff1dc054f082
Author: wudi <[email protected]>
AuthorDate: Mon May 11 15:31:29 2026 +0800

    branch-4.0: [fix](streaming-job) Fix NPE in 
StreamingInsertJob.replayOnCommitted during EditLog replay #62416 (#62629)
    
    Cherry-picked from #62416
---
 .../job/extensions/insert/streaming/StreamingInsertJob.java   | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 2a8c9310e49..91f193d098d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -992,13 +992,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
-        boolean shouldReleaseLock = false;
         writeLock();
+        boolean passCheck = false;
         try {
             if (runningStreamTask.getIsCanceled().get()) {
-                log.info("streaming insert job {} task {} is canceled, skip 
beforeCommitted",
-                        getJobId(), runningStreamTask.getTaskId());
-                return;
+                throw new TransactionException("streaming insert job " + 
getJobId()
+                        + " task " + runningStreamTask.getTaskId()
+                        + " is canceled, txn " + txnState.getTransactionId() + 
" could not be committed");
             }
 
             ArrayList<Long> taskIds = new ArrayList<>();
@@ -1019,8 +1019,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getFileNumber(),
                         loadStatistic.getTotalFileSizeB(),
                         
runningStreamTask.getRunningOffset().toSerializedJson()));
+            passCheck = true;
         } finally {
-            if (shouldReleaseLock) {
+            if (!passCheck) {
                 writeUnlock();
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to