[ 
https://issues.apache.org/jira/browse/FLINK-38883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-38883:
----------------------------------
    Description: 
We noticed an issue where the REST API reported a job being globally terminated 
({{FAILED}}) but the JRS entry wasn't created (due to some object store 
problems). The external monitor marked that as terminal due to the REST API 
call but the job recovered because no JRS entry existed and the job data wasn't 
cleaned up, i.e. during recovery of the JobManager the job was picked up again.

Conceptually, the problem stems from the fact that the JRS entry is only 
written after the job reached the globally terminal state (which is reported 
via the REST API). Instead, it should be written before reaching that state 
(i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the 
latter one doesn't even exist in the job state transition graph).

There are different options to handle that problem:
* Extend the JobDetails endpoint to include a flag that states whether the JRS 
entry was written for the job. The logic would live in the Dispatcher and might 
be the least invasive option. The REST API endpoint would need to become the 
only means to determine whether a job actually terminated globally. This is 
mentioned because of the stopWithSavepoint feature that where the result might 
be misinterpreted as the job is finished even though it doesn't include any 
information about the JRS entry.
  * The JRS lookup needs to be cached somehow because the REST endpoints would 
be access way more regularly then what the JRS was initially meant to handle 
(lookup during job termination)
* Retrieve JobStatus based on whether the JRS entry was written in all the 
JobStatus-including REST endpoint on the Dispatcher side.
  * Would require some additional logic to store the job's previous status.
  * Downside: The API is not necessarily aligned with the Flink logs
* Handle the dirty JRS entry write in the scheduler before finishing the code. 
For the AdaptiveScheduler, we could handle such a callback in 
`AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the implementation 
is a bit trickier because the DefaultScheduler is closely coupled to the 
ExecutionGraph: The JobStatus transitions to `FINISHED` in 
`DefaultExecutionGraph#jobFinished`
  * An easier approach is to use a wrapping class that owns its own 
terminationFuture that forwards the jobs terminationFuture in 
`DefaultScheduler`. That way, we could inject the JRS dirty entry writing 
before actually completing the termination future of the the wrapper class. 

  was:
We noticed an issue where the REST API reported a job being globally terminated 
({{FAILED}}) but the JRS entry wasn't created (due to some object store 
problems). The external monitor marked that as terminal due to the REST API 
call but the job recovered because no JRS entry existed and the job data wasn't 
cleaned up, i.e. during recovery of the JobManager the job was picked up again.

Conceptually, the problem stems from the fact that the JRS entry is only 
written after the job reached the globally terminal state (which is reported 
via the REST API). Instead, it should be written before reaching that state.


> Race condition of REST API and JRS entry might lead to inconsistent state
> -------------------------------------------------------------------------
>
>                 Key: FLINK-38883
>                 URL: https://issues.apache.org/jira/browse/FLINK-38883
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.0.1, 1.20.3, 2.2.0, 2.1.1
>            Reporter: Matthias Pohl
>            Priority: Major
>
> We noticed an issue where the REST API reported a job being globally 
> terminated ({{FAILED}}) but the JRS entry wasn't created (due to some object 
> store problems). The external monitor marked that as terminal due to the REST 
> API call but the job recovered because no JRS entry existed and the job data 
> wasn't cleaned up, i.e. during recovery of the JobManager the job was picked 
> up again.
> Conceptually, the problem stems from the fact that the JRS entry is only 
> written after the job reached the globally terminal state (which is reported 
> via the REST API). Instead, it should be written before reaching that state 
> (i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the 
> latter one doesn't even exist in the job state transition graph).
> There are different options to handle that problem:
> * Extend the JobDetails endpoint to include a flag that states whether the 
> JRS entry was written for the job. The logic would live in the Dispatcher and 
> might be the least invasive option. The REST API endpoint would need to 
> become the only means to determine whether a job actually terminated 
> globally. This is mentioned because of the stopWithSavepoint feature that 
> where the result might be misinterpreted as the job is finished even though 
> it doesn't include any information about the JRS entry.
>   * The JRS lookup needs to be cached somehow because the REST endpoints 
> would be access way more regularly then what the JRS was initially meant to 
> handle (lookup during job termination)
> * Retrieve JobStatus based on whether the JRS entry was written in all the 
> JobStatus-including REST endpoint on the Dispatcher side.
>   * Would require some additional logic to store the job's previous status.
>   * Downside: The API is not necessarily aligned with the Flink logs
> * Handle the dirty JRS entry write in the scheduler before finishing the 
> code. For the AdaptiveScheduler, we could handle such a callback in 
> `AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the 
> implementation is a bit trickier because the DefaultScheduler is closely 
> coupled to the ExecutionGraph: The JobStatus transitions to `FINISHED` in 
> `DefaultExecutionGraph#jobFinished`
>   * An easier approach is to use a wrapping class that owns its own 
> terminationFuture that forwards the jobs terminationFuture in 
> `DefaultScheduler`. That way, we could inject the JRS dirty entry writing 
> before actually completing the termination future of the the wrapper class. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to