[ https://issues.apache.org/jira/browse/FLINK-21030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277248#comment-17277248 ]
Till Rohrmann edited comment on FLINK-21030 at 2/2/21, 4:21 PM: ---------------------------------------------------------------- I think the problem can and should be solved in the scope of the {{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a "stop-with-savepoint" state where it should react differently to the available signals. In fact, the stop with savepoint is a two stage operation which needs different behaviour depending on the stage. 1. Taking a savepoint 2. Stopping the job gracefully (waiting that the notify checkpoint complete messages are sent and the {{Tasks}} shut themselves down) If a failure occurs in the first stage, then we should fail the stop-with-savepoint operation and recover the job normally. If a failure occurs in the second stage, then it is possible that parts of the {{ExecutionGraph}} have already shut down. Hence, any task failure needs to trigger a global failover where we restart the whole job. At the moment, the second stage is not treated properly which results into the reported problem. The easiest solution I can think of is to predicate the stop-with-savepoint operation on the state of the {{Executions}}. Only if all {{Executions}} reach a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if one observes an {{Execution}} which reaches another terminal state, then one knows that the savepoint operation has failed. Depending on whether one is in stage 1. or 2. one needs to do nothing or trigger a global job failover. Alternatively, one could make the state of stop-with-savepoint more explicit by letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This operation would then influence what {{DefaultScheduler.handleTaskFailure}} and {{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2. stage, then a single task failure should trigger a global failover. cc [~zhuzh] was (Author: till.rohrmann): I think the problem can and should be solved in the scope of the {{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a "stop-with-savepoint" state where it should react differently to the available signals. In fact, the stop with savepoint is a two stage operation which needs different behaviour depending on the stage. 1. Taking a savepoint 2. Stopping the job gracefully (waiting that the notify checkpoint complete messages are sent and the {{Tasks}} shut themselves down) If a failure occurs in the first stage, then we should fail the stop-with-savepoint operation and recover the job normally. If a failure occurs in the second stage, then it possible that parts of the {{ExecutionGraph}} have already shut down. Hence, any task failure needs to trigger a global failover where we restart the whole job. At the moment, the second stage is not treated properly which results into the reported problem. The easiest solution I can think of is to predicate the stop-with-savepoint operation on the state of the {{Executions}}. Only if all {{Executions}} reach a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if one observes an {{Execution}} which reaches another terminal state, then one knows that the savepoint operation has failed. Depending on whether one is in stage 1. or 2. one needs to do nothing or trigger a global job failover. Alternatively, one could make the state of stop-with-savepoint more explicit by letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This operation would then influence what {{DefaultScheduler.handleTaskFailure}} and {{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2. stage, then a single task failure should trigger a global failover. cc [~zhuzh] > Broken job restart for job with disjoint graph > ---------------------------------------------- > > Key: FLINK-21030 > URL: https://issues.apache.org/jira/browse/FLINK-21030 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.11.2 > Reporter: Theo Diefenthal > Assignee: Matthias > Priority: Blocker > Fix For: 1.11.4, 1.12.2, 1.13.0 > > > Building on top of bugs: > https://issues.apache.org/jira/browse/FLINK-21028 > and https://issues.apache.org/jira/browse/FLINK-21029 : > I tried to stop a Flink application on YARN via savepoint which didn't > succeed due to a possible bug/racecondition in shutdown (Bug 21028). Due to > some reason, Flink attempted to restart the pipeline after the failure in > shutdown (21029). The bug here: > As I mentioned: My jobgraph is disjoint and the pipelines are fully isolated. > Lets say the original error occured in a single task of pipeline1. Flink then > restarted the entire pipeline1, but pipeline2 was shutdown successfully and > switched the state to FINISHED. > My job thus was in kind of an invalid state after the attempt to stopping: > One of two pipelines was running, the other was FINISHED. I guess this is > kind of a bug in the restarting behavior that only all connected components > of a graph are restarted, but the others aren't... -- This message was sent by Atlassian Jira (v8.3.4#803005)