[ https://issues.apache.org/jira/browse/FLINK-30596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-30596: ------------------------------- Fix Version/s: 1.17.2 (was: 1.17.1) > Multiple POST /jars/:jarid/run requests with the same jobId, runs duplicate > jobs > -------------------------------------------------------------------------------- > > Key: FLINK-30596 > URL: https://issues.apache.org/jira/browse/FLINK-30596 > Project: Flink > Issue Type: Bug > Components: Runtime / REST > Affects Versions: 1.17.0, 1.16.1, 1.18.0 > Reporter: Mohsen Rezaei > Assignee: Mohsen Rezaei > Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > Analysis from [~trohrmann]: > {quote} > The problem is the following: When submitting a job, then the {{Dispatcher}} > will wait for the termination of a previous {{JobMaster}}. This is done to > enable the proper cleanup of the job resources. In the initial submission > case, there is no previous {{JobMaster}} with the same {{jobId}}. The problem > is now that Flink schedules the > [{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571] > action, which runs the newly submitted job, as [an asynchronous > task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318]. > This is done to ensure that the action is run on the {{Dispatcher}}'s main > thread since the termination future can be run on a different thread. Due to > this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s > work queue which are executed before. Such a task could be another job > submission which wouldn't see that there is already a job submitted with the > same {{jobId}} since [we only do this in > {{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602] > which is called by {{persistAndRunJob}}. This is the reason why you don't > see a duplicate job submission exception for the second job submission. Even > worse, this will eventually [lead to an invalid > state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615] > and fail the whole cluster entrypoint. > {quote} > The following fix to the {{Dispatcher}} seems to fix the issue, but before > submitting a PR, I wanted to post this for possible follow up discussions: > {code:language=java} > private CompletableFuture<Void> waitForTerminatingJob( > JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> > action) { > ... > return FutureUtils.thenAcceptAsyncIfNotDone( > jobManagerTerminationFuture, > getMainThreadExecutor(), > FunctionUtils.uncheckedConsumer( > (ignored) -> { > jobManagerRunnerTerminationFutures.remove(jobId); > action.accept(jobGraph); > })); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)