[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691
 ] 

Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:05 PM:
----------------------------------------------------------------

Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. It could be also that I'm missing a code path 
here.

[~dmvk], do you have something to add? 


was (Author: mapohl):
Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so 
far:

I struggle to find a connection between the {{RunningJobRegistry}} and the 
{{getJobStatus}} call of the client (which calls 
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming 
that we did a slight modification of the code when removing the 
{{RunningJobRegistry}} in 
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
 Marking this job as done happened before completing the 
{{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed 
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the 
{{{}JobResultStore{}}}.

My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in 
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was 
only used for leader recovery in 
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
 and when submitting a job through 
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
 in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find 
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of 
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an 
influence on {{{}Dispatcher#requestJobStatus{}}}.

I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't 
match my findings in the code. I could perfectly be that I'm missing a codepath 
here.

[~dmvk], do you have something to add? 

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32069
>                 URL: https://issues.apache.org/jira/browse/FLINK-32069
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1, 1.15.4
>            Reporter: Aleksandr Iushmanov
>            Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
>     checkState(tableResult.getJobClient().isPresent());
>     try {
>       tableResult.await();
>       JobClient jobClient = tableResult.getJobClient().get();
>       if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
>         context.out.write("Insertion successfully.\n");
>       } else {
>         throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>       }
>     } catch (InterruptedException e) {
>       throw new IOException("Flink job is interrupted", e);
>     } catch (ExecutionException e) {
>       throw new IOException("Flink job is failed", e);
>     } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to