Mohsen Rezaei created FLINK-30596:
-------------------------------------

             Summary: Multiple POST /jars/:jarid/run requests with the same 
jobId, runs duplicate jobs
                 Key: FLINK-30596
                 URL: https://issues.apache.org/jira/browse/FLINK-30596
             Project: Flink
          Issue Type: Bug
          Components: Runtime / REST
    Affects Versions: 1.15.3, 1.16.0, 1.17.0
            Reporter: Mohsen Rezaei
             Fix For: 1.17.0, 1.16.1, 1.15.4


Analysis from [~trohrmann]:

{quote}
The problem is the following: When submitting a job, then the {{Dispatcher}} 
will wait for the termination of a previous {{JobMaster}}. This is done to 
enable the proper cleanup of the job resources. In the initial submission case, 
there is no previous {{JobMaster}} with the same {{jobId}}. The problem is now 
that Flink schedules the 
[{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571]
 action, which runs the newly submitted job, as [an asynchronous 
task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318].
 This is done to ensure that the action is run on the {{Dispatcher}}'s main 
thread since the termination future can be run on a different thread. Due to 
this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s work 
queue which are executed before. Such a task could be another job submission 
which wouldn't see that there is already a job submitted with the same 
{{jobId}} since [we only do this in 
{{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602]
 which is called by {{persistAndRunJob}}. This is the reason why you don't see 
a duplicate job submission exception for the second job submission. Even worse, 
this will eventually [lead to an invalid 
state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615]
 and fail the whole cluster entrypoint.
{quote}

The following fix to the {{Dispatcher}} seems to fix the issue, but before 
submitting a PR, I wanted to post this for possible follow up discussions:

{code:language=java}
private CompletableFuture<Void> waitForTerminatingJob(
            JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> 
action) {
        ...
        return FutureUtils.thenAcceptAsyncIfNotDone(
                jobManagerTerminationFuture,
                getMainThreadExecutor(),
                FunctionUtils.uncheckedConsumer(
                    (ignored) -> {
                        jobManagerRunnerTerminationFutures.remove(jobId);
                        action.accept(jobGraph);
                    }));
    }
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to