[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052090#comment-17052090 ] Zhu Zhu commented on FLINK-16357: - I see. Yes, I think we can achieve it by adding a param like "restoreGlobally" to {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}} which only invokes {{OperatorCoordinator#resetToCheckpoint(...)}} if the param is true. This may need some small adjustment in {{DefaultScheduler}} and {{SchedulerBase}}. > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17051965#comment-17051965 ] Stephan Ewen commented on FLINK-16357: -- Invoking {{OperatorCoordinator #subtaskFailed(...)}} is the "common case" for failover handling. This for example tells the OperatorCoordinator that any splits assigned to that task since the last checkpoint need to be added back to the pool of unassigned splits. {{OperatorCoordinator#resetToCheckpoint(...)}} is primarily needed to restore from a savepoint and when the JobManager fails and recovers. I wanted to call it additionally in the case of a Global Failover as a safety net, exactly like you say. The execution graph has the strategy to to a global failover when any inconsistency is suspected, and in that case, I think it would be good to have a global failover on the OperatorCoordinator as well. > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049831#comment-17049831 ] Guowei Ma commented on FLINK-16357: --- I want to figure out the reason for distinguishing Global Failover from Region Failover. Is it to avoid inconsistencies between OperatorCoordinator and Operator (such as source)? For example, for Split, if any failover resets the Operator Coordinator state, the source may receive duplicate splits. > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048882#comment-17048882 ] Zhu Zhu commented on FLINK-16357: - In the case of a global failure, what's the difference between * invoking {{OperatorCoordinator #subtaskFailed(...)}} for all execution vertices of an {{ExecutionJobVertex}}, and * invoking {{OperatorCoordinator#resetToCheckpoint(...)}} Is {{resetToCheckpoint(...)}} another safety net? > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048870#comment-17048870 ] Stephan Ewen commented on FLINK-16357: -- Yes, {{OperatorCoordinator#resetToCheckpoint(...)}} is expected to be invoked in {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}}, iff a failure/recovery came from {{ExecutionGraph.failGlobal()}} or {{SchedulerNG.handleGlobalFailure()}}. Currently, if we would call {{OperatorCoordinator#resetToCheckpoint(...)}} within {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}} we would restore it on every regional failover as well, if I read the code correctly. The {{OperatorCoordinator}} exists once per {{ExecutionJobVertex}}, not per each {{ExecutionVertex}}. > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".
[ https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048828#comment-17048828 ] Zhu Zhu commented on FLINK-16357: - Is OperatorCoordinator#resetToCheckpoint(...) expected to be invoked in CheckpointCoordinator#restoreLatestCheckpointedState(...) ? If so, seems there is not need to tell the CheckpointCoordinator it is a global failure or a regional failure, but can just be a set of execution vertices which are affected by the failure, namely changing the param {{tasks}} of CheckpointCoordinator#restoreLatestCheckpointedState(...) from Set to Set. In the new scheduler (DefaultScheduler), the logics of global failure recovery and regional failure recovery are almost the same except for the logic to calculate the ExecutionVertex to restart. So it does not differentiate global failure nor regional failure in the stage to restore task states and reschedule the tasks. And there would always be a set of ExecutionVertex to restart which can be passed to the CheckpointCoordinator#restoreLatestCheckpointedState(...). > Extend Checkpoint Coordinator to differentiate between "regional restore" and > "full restore". > - > > Key: FLINK-16357 > URL: https://issues.apache.org/jira/browse/FLINK-16357 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.11.0 > > > The {{ExecutionGraph}} has the notion of "global failure" (failing the entire > execution graph) and "regional failure" (recover a region with transient > pipelined data exchanges). > The latter one is for common failover, the former one is a safety net to > handle unexpected failures or inconsistencies (full reset of ExecutionGraph > recovers most inconsistencies). > The OperatorCoordinators should only be reset to a checkpoint in the "global > failover" case. In the "regional failover" case, they are only notified of > the tasks that are reset and keep their internal state and adjust it for the > failed tasks. > To implement that, the ExecutionGraph needs to forward the information about > whether we are restoring from a "regional failure" or from a "global failure". -- This message was sent by Atlassian Jira (v8.3.4#803005)