[
https://issues.apache.org/jira/browse/FLINK-38883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Pohl updated FLINK-38883:
----------------------------------
Description:
We noticed an issue where the REST API reported a job being globally terminated
({{FAILED}}) but the JRS entry wasn't created (due to some object store
problems). The external monitor marked that as terminal due to the REST API
call but the job recovered because no JRS entry existed and the job data wasn't
cleaned up, i.e. during recovery of the JobManager the job was picked up again.
Conceptually, the problem stems from the fact that the JRS entry is only
written after the job reached the globally terminal state (which is reported
via the REST API). Instead, it should be written before reaching that state
(i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the
latter one doesn't even exist in the job state transition graph).
There are different options to handle that problem:
* Extend the JobDetails endpoint to include a flag that states whether the JRS
entry was written for the job. The logic would live in the Dispatcher and might
be the least invasive option. The REST API endpoint would need to become the
only means to determine whether a job actually terminated globally. This is
mentioned because of the stopWithSavepoint feature that where the result might
be misinterpreted as the job is finished even though it doesn't include any
information about the JRS entry.
* The JRS lookup needs to be cached somehow because the REST endpoints would
be access way more regularly then what the JRS was initially meant to handle
(lookup during job termination)
* Retrieve JobStatus based on whether the JRS entry was written in all the
JobStatus-including REST endpoint on the Dispatcher side.
* Would require some additional logic to store the job's previous status.
* Downside: The API is not necessarily aligned with the Flink logs
* Handle the dirty JRS entry write in the scheduler before finishing the code.
For the AdaptiveScheduler, we could handle such a callback in
`AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the implementation
is a bit trickier because the DefaultScheduler is closely coupled to the
ExecutionGraph: The JobStatus transitions to `FINISHED` in
`DefaultExecutionGraph#jobFinished`
* An easier approach is to use a wrapping class that owns its own
terminationFuture that forwards the jobs terminationFuture in
`DefaultScheduler`. That way, we could inject the JRS dirty entry writing
before actually completing the termination future of the the wrapper class.
was:
We noticed an issue where the REST API reported a job being globally terminated
({{FAILED}}) but the JRS entry wasn't created (due to some object store
problems). The external monitor marked that as terminal due to the REST API
call but the job recovered because no JRS entry existed and the job data wasn't
cleaned up, i.e. during recovery of the JobManager the job was picked up again.
Conceptually, the problem stems from the fact that the JRS entry is only
written after the job reached the globally terminal state (which is reported
via the REST API). Instead, it should be written before reaching that state.
> Race condition of REST API and JRS entry might lead to inconsistent state
> -------------------------------------------------------------------------
>
> Key: FLINK-38883
> URL: https://issues.apache.org/jira/browse/FLINK-38883
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.0.1, 1.20.3, 2.2.0, 2.1.1
> Reporter: Matthias Pohl
> Priority: Major
>
> We noticed an issue where the REST API reported a job being globally
> terminated ({{FAILED}}) but the JRS entry wasn't created (due to some object
> store problems). The external monitor marked that as terminal due to the REST
> API call but the job recovered because no JRS entry existed and the job data
> wasn't cleaned up, i.e. during recovery of the JobManager the job was picked
> up again.
> Conceptually, the problem stems from the fact that the JRS entry is only
> written after the job reached the globally terminal state (which is reported
> via the REST API). Instead, it should be written before reaching that state
> (i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the
> latter one doesn't even exist in the job state transition graph).
> There are different options to handle that problem:
> * Extend the JobDetails endpoint to include a flag that states whether the
> JRS entry was written for the job. The logic would live in the Dispatcher and
> might be the least invasive option. The REST API endpoint would need to
> become the only means to determine whether a job actually terminated
> globally. This is mentioned because of the stopWithSavepoint feature that
> where the result might be misinterpreted as the job is finished even though
> it doesn't include any information about the JRS entry.
> * The JRS lookup needs to be cached somehow because the REST endpoints
> would be access way more regularly then what the JRS was initially meant to
> handle (lookup during job termination)
> * Retrieve JobStatus based on whether the JRS entry was written in all the
> JobStatus-including REST endpoint on the Dispatcher side.
> * Would require some additional logic to store the job's previous status.
> * Downside: The API is not necessarily aligned with the Flink logs
> * Handle the dirty JRS entry write in the scheduler before finishing the
> code. For the AdaptiveScheduler, we could handle such a callback in
> `AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the
> implementation is a bit trickier because the DefaultScheduler is closely
> coupled to the ExecutionGraph: The JobStatus transitions to `FINISHED` in
> `DefaultExecutionGraph#jobFinished`
> * An easier approach is to use a wrapping class that owns its own
> terminationFuture that forwards the jobs terminationFuture in
> `DefaultScheduler`. That way, we could inject the JRS dirty entry writing
> before actually completing the termination future of the the wrapper class.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)