This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4760f093948 branch-4.0: [fix](streaming-job) Fix NPE in
StreamingInsertJob.replayOnCommitted during EditLog replay #62416 (#62629)
4760f093948 is described below
commit 4760f093948da930bb03fdec9058ff1dc054f082
Author: wudi <[email protected]>
AuthorDate: Mon May 11 15:31:29 2026 +0800
branch-4.0: [fix](streaming-job) Fix NPE in
StreamingInsertJob.replayOnCommitted during EditLog replay #62416 (#62629)
Cherry-picked from #62416
---
.../job/extensions/insert/streaming/StreamingInsertJob.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 2a8c9310e49..91f193d098d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -992,13 +992,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeCommitted(TransactionState txnState) throws
TransactionException {
- boolean shouldReleaseLock = false;
writeLock();
+ boolean passCheck = false;
try {
if (runningStreamTask.getIsCanceled().get()) {
- log.info("streaming insert job {} task {} is canceled, skip
beforeCommitted",
- getJobId(), runningStreamTask.getTaskId());
- return;
+ throw new TransactionException("streaming insert job " +
getJobId()
+ + " task " + runningStreamTask.getTaskId()
+ + " is canceled, txn " + txnState.getTransactionId() +
" could not be committed");
}
ArrayList<Long> taskIds = new ArrayList<>();
@@ -1019,8 +1019,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
runningStreamTask.getRunningOffset().toSerializedJson()));
+ passCheck = true;
} finally {
- if (shouldReleaseLock) {
+ if (!passCheck) {
writeUnlock();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]