Mohsen Rezaei created FLINK-30596: ------------------------------------- Summary: Multiple POST /jars/:jarid/run requests with the same jobId, runs duplicate jobs Key: FLINK-30596 URL: https://issues.apache.org/jira/browse/FLINK-30596 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.15.3, 1.16.0, 1.17.0 Reporter: Mohsen Rezaei Fix For: 1.17.0, 1.16.1, 1.15.4
Analysis from [~trohrmann]: {quote} The problem is the following: When submitting a job, then the {{Dispatcher}} will wait for the termination of a previous {{JobMaster}}. This is done to enable the proper cleanup of the job resources. In the initial submission case, there is no previous {{JobMaster}} with the same {{jobId}}. The problem is now that Flink schedules the [{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571] action, which runs the newly submitted job, as [an asynchronous task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318]. This is done to ensure that the action is run on the {{Dispatcher}}'s main thread since the termination future can be run on a different thread. Due to this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s work queue which are executed before. Such a task could be another job submission which wouldn't see that there is already a job submitted with the same {{jobId}} since [we only do this in {{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602] which is called by {{persistAndRunJob}}. This is the reason why you don't see a duplicate job submission exception for the second job submission. Even worse, this will eventually [lead to an invalid state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615] and fail the whole cluster entrypoint. {quote} The following fix to the {{Dispatcher}} seems to fix the issue, but before submitting a PR, I wanted to post this for possible follow up discussions: {code:language=java} private CompletableFuture<Void> waitForTerminatingJob( JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) { ... return FutureUtils.thenAcceptAsyncIfNotDone( jobManagerTerminationFuture, getMainThreadExecutor(), FunctionUtils.uncheckedConsumer( (ignored) -> { jobManagerRunnerTerminationFutures.remove(jobId); action.accept(jobGraph); })); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)