github-actions[bot] commented on code in PR #64731:
URL: https://github.com/apache/doris/pull/64731#discussion_r3459218954
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java:
##########
@@ -265,7 +265,13 @@ private void handleSubmitTaskFailure(RoutineLoadTaskInfo
routineLoadTaskInfo, St
}
// for other errors (network issues, BE restart, etc.), reschedule
immediately
- RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
+ RoutineLoadTaskInfo newTask;
+ routineLoadJob.writeLock();
+ try {
+ newTask = routineLoadJob.unprotectRenewTask(routineLoadTaskInfo,
false);
Review Comment:
Holding the write lock during `unprotectRenewTask` avoids the unsynchronized
list mutation, but it still does not make the renew conditional on the failed
task still belonging to a running job. There is a race where
`updateState(PAUSED, ...)` can acquire the same job write lock between
`setOtherMsg()` and this new lock, and `executePause()` clears
`routineLoadTaskInfoList`. After that, this code acquires the lock and calls
`unprotectRenewTask` anyway; both Kafka and Kinesis implementations ignore the
result of `routineLoadTaskInfoList.remove(oldTask)` and unconditionally add the
replacement task. Since `scheduleOneTask()` only checks `isFinal()` and PAUSED
is not final, the replacement can be queued/scheduled for a paused job. Please
re-check the task membership/state under this write lock before renewing, and
skip the renew if the task was already removed by a state transition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]