[ 
https://issues.apache.org/jira/browse/FLINK-20396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239774#comment-17239774
 ] 

Stephan Ewen edited comment on FLINK-20396 at 11/27/20, 4:43 PM:
-----------------------------------------------------------------

In the first version, I would add the {{subtaskReset()}} method in addition to 
the {{subtaskFailed()}} method because of two reasons:
  - {{subtaskFailed()}} can potentially do slightly faster cleanup (for example 
unregistering readers)
  - It is complex to communicate a failure cause to {{subtaskReset()}}. If we 
want to make actions dependent on exception types, we need to handle that in 
the {{subtaskFailed()}} method.

We can decide to remove {{subtaskFailed()}} in the future, if we feel we don't 
need the slightly faster notification, or the failure reason.

If we want to retain the failure reason, but do not care about the slightly 
faster notification, we can consolidate the two into a single method 
{{subtaskReset(int subtask, long checkpointId, Throwable failureCause);}}.
The {{OperatorCoordinatorHolder}} can remember the exceptions per subtask 
between failure and restore to pass them to the coordinator.
Because that needs potentially noticeably more heap memory (retain many 
exceptions with stack traces) I would suggest to do that change "if needed" and 
not immediately.


was (Author: stephanewen):
In the first version, I would add the {{subtaskReset()}} method in addition to 
the {{subtaskFailed()}} method because of two reasons:
  - {{subtaskFailed()}} can potentially do slightly faster cleanup (for example 
unregistering readers)
  - It is complex to communicate a failure cause to {{subtaskReset()}}. If we 
want to make 

We can decide to remove {{subtaskFailed()}} in the future, if we feel we don't 
need the slightly faster notification, or the failure reason.

If we want to retain the failure reason, but do not care about the slightly 
faster notification, we can consolidate the two into a single method 
{{subtaskReset(int subtask, long checkpointId, Throwable failureCause);}}.
The {{OperatorCoordinatorHolder}} can remember the exceptions per subtask 
between failure and restore to pass them to the coordinator.
Because that needs potentially noticeably more heap memory (retain many 
exceptions with stack traces) I would suggest to do that change "if needed" and 
not immediately.

> Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"
> ----------------------------------------------------------------------
>
>                 Key: FLINK-20396
>                 URL: https://issues.apache.org/jira/browse/FLINK-20396
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.12.0, 1.11.3
>
>
> There are no strong order guarantees between 
> {{OperatorCoordinator.subtaskFailed()}} and 
> {{OperatorCoordinator.notifyCheckpointComplete()}}.
> It can happen that a checkpoint completes after the notification for task 
> failure is sent:
>   - {{OperatorCoordinator.checkpoint()}}
>   - {{OperatorCoordinator.subtaskFailed()}}
>   - {{OperatorCoordinator.checkpointComplete()}}
> The subtask failure here does not know whether the previous checkpoint 
> completed or not. It cannot decide what state the subtask will be in after 
> recovery.
> There is no easy fix right now to strictly guarantee the order of the method 
> calls, so alternatively we need to provide the necessary information to 
> reason about the status of tasks.
> We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with 
> {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That 
> implementations get the explicit checkpoint ID for the subtask recovery, and 
> can align that with the IDs of checkpoints that were taken.
> It is still (in rare cases) possible that for a specific checkpoint C, 
> {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before 
> {{OperatorCoordinator.checkpointComplete(C)}}.
> h3. Background
> The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / 
> {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, 
> the finalization (writing out metadata and registering the checkpoint in 
> ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.
> This sequence of events can happen:
>   - tasks acks checkpoint
>   - checkpoint fully acknowledged, finalization starts
>   - task fails
>   - task failure notification is dispatched
>   - checkpoint completes.
> For task failures and checkpoint completion, no order is defined.
> However, for task restore and checkpoint completion, the order is well 
> defined: When a task is restored, pending checkpoints are either canceled or 
> complete. None can be within finalization. That is currently guaranteed with 
> a lock in the {{CheckpointCoordinator}}.
> (An implication of that being that restores can be blocking operations in the 
> scheduler, which is not ideal from the perspective of making the scheduler 
> async/non-blocking, but it is currently essential for correctness).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to