[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052090#comment-17052090
 ] 

Zhu Zhu commented on FLINK-16357:
-

I see. Yes, I think we can achieve it by adding a param like "restoreGlobally" 
to {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}} which only 
invokes {{OperatorCoordinator#resetToCheckpoint(...)}} if the param is true. 
This may need some small adjustment in {{DefaultScheduler}} and 
{{SchedulerBase}}.

> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-05 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17051965#comment-17051965
 ] 

Stephan Ewen commented on FLINK-16357:
--

Invoking {{OperatorCoordinator #subtaskFailed(...)}} is the "common case" for 
failover handling. This for example tells the OperatorCoordinator that any 
splits assigned to that task since the last checkpoint need to be added back to 
the pool of unassigned splits.

{{OperatorCoordinator#resetToCheckpoint(...)}} is primarily needed to restore 
from a savepoint and when the JobManager fails and recovers.
I wanted to call it additionally in the case of a Global Failover as a safety 
net, exactly like you say. The execution graph has the strategy to to a global 
failover when any inconsistency is suspected, and in that case, I think it 
would be good to have a global failover on the OperatorCoordinator as well.


> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-02 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049831#comment-17049831
 ] 

Guowei Ma commented on FLINK-16357:
---

I want to figure out the reason for distinguishing Global Failover from Region 
Failover. Is it to avoid inconsistencies between OperatorCoordinator and 
Operator (such as source)? For example, for Split, if any failover resets the 
Operator Coordinator state, the source may receive duplicate splits.

> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-02 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048882#comment-17048882
 ] 

Zhu Zhu commented on FLINK-16357:
-

In the case of a global failure, what's the difference between
 * invoking {{OperatorCoordinator #subtaskFailed(...)}} for all execution 
vertices of an {{ExecutionJobVertex}}, and
 * invoking {{OperatorCoordinator#resetToCheckpoint(...)}}

Is {{resetToCheckpoint(...)}} another safety net?

> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-02 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048870#comment-17048870
 ] 

Stephan Ewen commented on FLINK-16357:
--

Yes, {{OperatorCoordinator#resetToCheckpoint(...)}} is expected to be invoked 
in {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}}, iff a 
failure/recovery came from {{ExecutionGraph.failGlobal()}} or 
{{SchedulerNG.handleGlobalFailure()}}.

Currently, if we would call  {{OperatorCoordinator#resetToCheckpoint(...)}} 
within {{CheckpointCoordinator#restoreLatestCheckpointedState(...)}} we would 
restore it on every regional failover as well, if I read the code correctly.

The {{OperatorCoordinator}} exists once per {{ExecutionJobVertex}}, not per 
each {{ExecutionVertex}}.

> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16357) Extend Checkpoint Coordinator to differentiate between "regional restore" and "full restore".

2020-03-01 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048828#comment-17048828
 ] 

Zhu Zhu commented on FLINK-16357:
-

Is OperatorCoordinator#resetToCheckpoint(...) expected to be invoked in 
CheckpointCoordinator#restoreLatestCheckpointedState(...) ? If so, seems there 
is not need to tell the CheckpointCoordinator it is a global failure or a 
regional failure, but can just be a set of execution vertices which are 
affected by the failure, namely changing the param {{tasks}} of 
CheckpointCoordinator#restoreLatestCheckpointedState(...) from 
Set to Set.

In the new scheduler (DefaultScheduler), the logics of global failure recovery 
and regional failure recovery are almost the same except for the logic to 
calculate the ExecutionVertex to restart. So it does not differentiate global 
failure nor regional failure in the stage to restore task states and reschedule 
the tasks. And there would always be a set of ExecutionVertex to restart which 
can be passed to the CheckpointCoordinator#restoreLatestCheckpointedState(...).

> Extend Checkpoint Coordinator to differentiate between "regional restore" and 
> "full restore".
> -
>
> Key: FLINK-16357
> URL: https://issues.apache.org/jira/browse/FLINK-16357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.11.0
>
>
> The {{ExecutionGraph}} has the notion of "global failure" (failing the entire 
> execution graph) and "regional failure" (recover a region with transient 
> pipelined data exchanges).
> The latter one is for common failover, the former one is a safety net to 
> handle unexpected failures or inconsistencies (full reset of ExecutionGraph 
> recovers most inconsistencies).
> The OperatorCoordinators should only be reset to a checkpoint in the "global 
> failover" case. In the "regional failover" case, they are only notified of 
> the tasks that are reset and keep their internal state and adjust it for the 
> failed tasks.
> To implement that, the ExecutionGraph needs to forward the information about 
> whether we are restoring from a "regional failure" or from a "global failure".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)