[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722820#comment-17722820
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 3:24 PM:
----------------------------------------------------------------

Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{{}JobStatus{}}}. The next step is writing the 
{{ExecutionGraphInfo}} of the terminated job into the 
{{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the 
{{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on 
the same {{{}ExecutionGraph{}}}. Any job status call should rely on the 
{{{}JobManagerRunnerRegistry{}}}, still.

The {{Dispatcher#globalResourceCleaner}} is configured in 
[DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106].
 It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up 
all the other artifacts. This cleanup procedure is triggered after the 
{{JobResultStore}} has the job's dirty entry written to disk/memory (and 
consequently as well, after having the {{ExecutionGraphInfo}} being persisted 
in the {{{}ExecutionGraphInfoStore{}}}).

The cleanup will result in the job's {{JobManagerRunner}} not being present in 
the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to 
rely on the {{ExecutionGraphInfoStore}} at that moment which has the same 
{{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}.


was (Author: mapohl):
Thanks for sharing your train of thought here, [~izeren]. Don't be overwhelmed 
by the following lines. I'm writing it down for documentation purposes. It 
would be good if you could verify my findings.
{quote}My line of thinking was that job result future is returned based on 
registry and job status based on result store and it it is not in sync same way 
as it used to be.
{quote}
The {{JobManagerRunnerRegistry}} holds the {{JobManagerRunner}} instances. The 
cleanup of the job is triggered (in 
[Dispatcher:681|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L681])
 as soon as the result of the {{JobManagerRunner}} is completed (which happens 
in 
[JobMasterServiceLeadershipRunner#onJobCompletion:384|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L384]).
 In that moment, {{JobManagerRunnerRegistry}} is the source of truth when it 
comes to the {{{}JobStatus{}}}. The next step is writing the 
{{ExecutionGraphInfo}} of the terminated job into the 
{{ExecutionGraphInfoStore}} in 
[Dispatcher#jobReachedTerminalState:1334|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1334].
 After that, we create a dirty entry in the {{JobResultStore}} in 
[Dispatcher#jobReachedTerminalState:1347|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1347]
 which runs the operation asynchronously in the {{ioExecutor}} (see 
[Dispatcher#registerGloballyTerminatedJobInJobResultStore:1365|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1365].
 The future is forwarded to 
[Dispatcher#runJob:678|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L678]
 where it will trigger the {{Dispatcher#globalResourceCleaner}} through 
[Dispatcher#removeJob:1245|https://github.com/apache/flink/blob/ee912b55f9405a41c391f942636b83c6f2c968f3/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1245].

We have the job's status stored in the {{{}JobManagerRunnerRegistry{}}}, the 
{{{}ExecutionGraphInforStore{}}}, and the {{JobResultStore}} persisted based on 
the same {{{}ExecutionGraph{}}}. Any job status call should rely on the 
{{{}JobManagerRunnerRegistry{}}}, still.

The {{Dispatcher#globalResourceCleaner}} is configured in 
[DisptacherResourceCleanerFactory#createGlobalResourceClearner:106ff|https://github.com/apache/flink/blob/c5352fc55972420ed5bf1afdfd97834540b1407a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java#L106].
 It removes the job from the {{JobManagerRunnerRegistry}} before cleaning up 
all the other artifacts. This cleanup procedure is triggered after the 
{{JobResultStore}} has the job's dirty entry written to disk/memory.

The cleanup will result in the job's {{JobManagerRunner}} not being present in 
the {{{}JobManagerRunnerRegistry{}}}, anymore. The {{Dispatcher}} starts to 
rely on the {{ExecutionGraphInfoStore}} at that moment which has the same 
{{JobStatus}} present as the {{{}JobManagerRunnerRegistry{}}}.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32069
>                 URL: https://issues.apache.org/jira/browse/FLINK-32069
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1, 1.15.4
>            Reporter: Aleksandr Iushmanov
>            Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
>     checkState(tableResult.getJobClient().isPresent());
>     try {
>       tableResult.await();
>       JobClient jobClient = tableResult.getJobClient().get();
>       if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
>         context.out.write("Insertion successfully.\n");
>       } else {
>         throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>       }
>     } catch (InterruptedException e) {
>       throw new IOException("Flink job is interrupted", e);
>     } catch (ExecutionException e) {
>       throw new IOException("Flink job is failed", e);
>     } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to