[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722858#comment-17722858
 ] 

Aleksandr Iushmanov commented on FLINK-32069:
---------------------------------------------

Thanks a lot [~mapohl] for this detailed analysis. I have only one question 
left around this dispatcher method that is responsible for job status:
{code:java}
@Override
public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time 
timeout) {
    List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails =
            queryJobMastersForInformation(
                    jobManagerRunner -> 
jobManagerRunner.requestJobDetails(timeout));

    CompletableFuture<Collection<Optional<JobDetails>>> 
optionalCombinedJobDetails =
            FutureUtils.combineAll(individualOptionalJobDetails);

    CompletableFuture<Collection<JobDetails>> combinedJobDetails =
            
optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);

    final Collection<JobDetails> completedJobDetails =
            executionGraphInfoStore.getAvailableJobDetails();

    return combinedJobDetails.thenApply(
            (Collection<JobDetails> runningJobDetails) -> {
                final Map<JobID, JobDetails> deduplicatedJobs = new HashMap<>();

                completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
                runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));

                return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
            });
} {code}

I am sorry if this is naive, but I wonder if the following is possible?
1. {{optionalCombinedJobDetails}} contains job status as RUNNING
2. in parallel job transitioned to FINISHED state and {{completedJobDetails}} 
already has correct execution graph with FINISHED state
3. {{combinedJobDetails}} are compiled with both results but 
{{runningJobDetails}} has priority over {{completedJobDetails}} and overrides.

I will continue to look into this issue on my side, I will try to intercept JM 
on {{onJobCompletion}} method and in parallel request job status.

If what I think is not possible or not causing the issue, I will pivot and have 
a look at InsertResultProvider side of things to check if another promise is 
broken (that insert operation has only one row in result)



> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32069
>                 URL: https://issues.apache.org/jira/browse/FLINK-32069
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1, 1.15.4
>            Reporter: Aleksandr Iushmanov
>            Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
>     checkState(tableResult.getJobClient().isPresent());
>     try {
>       tableResult.await();
>       JobClient jobClient = tableResult.getJobClient().get();
>       if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
>         context.out.write("Insertion successfully.\n");
>       } else {
>         throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>       }
>     } catch (InterruptedException e) {
>       throw new IOException("Flink job is interrupted", e);
>     } catch (ExecutionException e) {
>       throw new IOException("Flink job is failed", e);
>     } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to