[ 
https://issues.apache.org/jira/browse/FLINK-30596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738183#comment-17738183
 ] 

Chesnay Schepler edited comment on FLINK-30596 at 6/28/23 3:46 PM:
-------------------------------------------------------------------

master: b528d9b81c03345c0415490fc41e27968313e5f0
1.17: TBD
1.16: TBD


was (Author: zentol):
master: b528d9b81c03345c0415490fc41e27968313e5f0

> Multiple POST /jars/:jarid/run requests with the same jobId, runs duplicate 
> jobs
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-30596
>                 URL: https://issues.apache.org/jira/browse/FLINK-30596
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.17.0, 1.16.1, 1.18.0
>            Reporter: Mohsen Rezaei
>            Assignee: Mohsen Rezaei
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Analysis from [~trohrmann]:
> {quote}
> The problem is the following: When submitting a job, then the {{Dispatcher}} 
> will wait for the termination of a previous {{JobMaster}}. This is done to 
> enable the proper cleanup of the job resources. In the initial submission 
> case, there is no previous {{JobMaster}} with the same {{jobId}}. The problem 
> is now that Flink schedules the 
> [{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571]
>  action, which runs the newly submitted job, as [an asynchronous 
> task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318].
>  This is done to ensure that the action is run on the {{Dispatcher}}'s main 
> thread since the termination future can be run on a different thread. Due to 
> this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s 
> work queue which are executed before. Such a task could be another job 
> submission which wouldn't see that there is already a job submitted with the 
> same {{jobId}} since [we only do this in 
> {{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602]
>  which is called by {{persistAndRunJob}}. This is the reason why you don't 
> see a duplicate job submission exception for the second job submission. Even 
> worse, this will eventually [lead to an invalid 
> state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615]
>  and fail the whole cluster entrypoint.
> {quote}
> The following fix to the {{Dispatcher}} seems to fix the issue, but before 
> submitting a PR, I wanted to post this for possible follow up discussions:
> {code:language=java}
> private CompletableFuture<Void> waitForTerminatingJob(
>             JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> 
> action) {
>         ...
>         return FutureUtils.thenAcceptAsyncIfNotDone(
>                 jobManagerTerminationFuture,
>                 getMainThreadExecutor(),
>                 FunctionUtils.uncheckedConsumer(
>                     (ignored) -> {
>                         jobManagerRunnerTerminationFutures.remove(jobId);
>                         action.accept(jobGraph);
>                     }));
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to