[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722788#comment-17722788
 ] 

Aleksandr Iushmanov commented on FLINK-32069:
---------------------------------------------

I can provide more details on the code path used in my tests:

On zeppelin side of things it goes to:
ClusterClientJobClientAdapter -> RestClusterClient -> getJobStatus -> 
getJobDetails
getJobDetails is a rest API call to /jobs/:jobid
 
On flink cluster side:
this request is handled by JobOwerviewHandler and processed with 
Dispatcher::requestMultipleJobDetails
this method requests both info on running and completed jobs 
for completed jobs it relies on ExecutionGraphInfoStore, in my case it is 
FileExecutionGraphInfoStore::getAvailableJobDetails()
Which is local file jobDetailCache
if I get it right, this cache is updated in Dispatcher::jobReachedTerminalState
 
For job result it goes to /jobs/:jobid/execution-result 
Dispatcher::requestJobResult checks if job is in jobManagerRunnerRegistry and 
if it is there it returns result future based on job instance in registry, 
otherwise it picks up result from executionGraphInfoStore (which should be the 
same as for status)
 
My line of thinking was that job result future is returned based on registry 
and job status based on result store and it it is not in sync same way as it 
used to be. 
 
I am not 100% insisting though that bug is there. It can also be in 
InsertResultProvider. I was originally looking at TableResult impl and I have 
noticed that "firstRow" logic is not new. Also for insert operations you get 
"Program execution finished" when you request result. I would expect job to 
have status finished in this case (as per documentation). But it is true that 
this can be on ResultProvider side as well (if it doesn't ensure that job 
status is finished)

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32069
>                 URL: https://issues.apache.org/jira/browse/FLINK-32069
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1, 1.15.4
>            Reporter: Aleksandr Iushmanov
>            Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
>     checkState(tableResult.getJobClient().isPresent());
>     try {
>       tableResult.await();
>       JobClient jobClient = tableResult.getJobClient().get();
>       if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
>         context.out.write("Insertion successfully.\n");
>       } else {
>         throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>       }
>     } catch (InterruptedException e) {
>       throw new IOException("Flink job is interrupted", e);
>     } catch (ExecutionException e) {
>       throw new IOException("Flink job is failed", e);
>     } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to