[ 
https://issues.apache.org/jira/browse/FLINK-21030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277248#comment-17277248
 ] 

Till Rohrmann edited comment on FLINK-21030 at 2/2/21, 4:21 PM:
----------------------------------------------------------------

I think the problem can and should be solved in the scope of the 
{{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by 
calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a 
"stop-with-savepoint" state where it should react differently to the available 
signals. In fact, the stop with savepoint is a two stage operation which needs 
different behaviour depending on the stage.

1. Taking a savepoint
2. Stopping the job gracefully (waiting that the notify checkpoint complete 
messages are sent and the {{Tasks}} shut themselves down)

If a failure occurs in the first stage, then we should fail the 
stop-with-savepoint operation and recover the job normally. If a failure occurs 
in the second stage, then it is possible that parts of the {{ExecutionGraph}} 
have already shut down. Hence, any task failure needs to trigger a global 
failover where we restart the whole job.

At the moment, the second stage is not treated properly which results into the 
reported problem.

The easiest solution I can think of is to predicate the stop-with-savepoint 
operation on the state of the {{Executions}}. Only if all {{Executions}} reach 
a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if 
one observes an {{Execution}} which reaches another terminal state, then one 
knows that the savepoint operation has failed. Depending on whether one is in 
stage 1. or 2. one needs to do nothing or trigger a global job failover.

Alternatively, one could make the state of stop-with-savepoint more explicit by 
letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This 
operation would then influence what {{DefaultScheduler.handleTaskFailure}} and 
{{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2. 
stage, then a single task failure should trigger a global failover.

cc [~zhuzh]


was (Author: till.rohrmann):
I think the problem can and should be solved in the scope of the 
{{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by 
calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a 
"stop-with-savepoint" state where it should react differently to the available 
signals. In fact, the stop with savepoint is a two stage operation which needs 
different behaviour depending on the stage.

1. Taking a savepoint
2. Stopping the job gracefully (waiting that the notify checkpoint complete 
messages are sent and the {{Tasks}} shut themselves down)

If a failure occurs in the first stage, then we should fail the 
stop-with-savepoint operation and recover the job normally. If a failure occurs 
in the second stage, then it possible that parts of the {{ExecutionGraph}} have 
already shut down. Hence, any task failure needs to trigger a global failover 
where we restart the whole job.

At the moment, the second stage is not treated properly which results into the 
reported problem.

The easiest solution I can think of is to predicate the stop-with-savepoint 
operation on the state of the {{Executions}}. Only if all {{Executions}} reach 
a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if 
one observes an {{Execution}} which reaches another terminal state, then one 
knows that the savepoint operation has failed. Depending on whether one is in 
stage 1. or 2. one needs to do nothing or trigger a global job failover.

Alternatively, one could make the state of stop-with-savepoint more explicit by 
letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This 
operation would then influence what {{DefaultScheduler.handleTaskFailure}} and 
{{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2. 
stage, then a single task failure should trigger a global failover.

cc [~zhuzh]

> Broken job restart for job with disjoint graph
> ----------------------------------------------
>
>                 Key: FLINK-21030
>                 URL: https://issues.apache.org/jira/browse/FLINK-21030
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.2
>            Reporter: Theo Diefenthal
>            Assignee: Matthias
>            Priority: Blocker
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> Building on top of bugs:
> https://issues.apache.org/jira/browse/FLINK-21028
>  and https://issues.apache.org/jira/browse/FLINK-21029 : 
> I tried to stop a Flink application on YARN via savepoint which didn't 
> succeed due to a possible bug/racecondition in shutdown (Bug 21028). Due to 
> some reason, Flink attempted to restart the pipeline after the failure in 
> shutdown (21029). The bug here:
> As I mentioned: My jobgraph is disjoint and the pipelines are fully isolated. 
> Lets say the original error occured in a single task of pipeline1. Flink then 
> restarted the entire pipeline1, but pipeline2 was shutdown successfully and 
> switched the state to FINISHED.
> My job thus was in kind of an invalid state after the attempt to stopping: 
> One of two pipelines was running, the other was FINISHED. I guess this is 
> kind of a bug in the restarting behavior that only all connected components 
> of a graph are restarted, but the others aren't...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to