[ 
https://issues.apache.org/jira/browse/FLINK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23471:
-----------------------------
    Summary: Try best to ensure all operators and state manager handle the 
checkpoint notification complete  (was: Try best to ensure all operators and 
state manager handle the checkpoint notification)

> Try best to ensure all operators and state manager handle the checkpoint 
> notification complete
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23471
>                 URL: https://issues.apache.org/jira/browse/FLINK-23471
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Task
>            Reporter: Yun Tang
>            Assignee: Yun Tang
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has 
> implementation below:
> {code:java}
>     @Override
>     public void notifyCheckpointComplete(
>             long checkpointId, OperatorChain<?, ?> operatorChain, 
> Supplier<Boolean> isRunning)
>             throws Exception {
>         if (!isRunning.get()) {
>             LOG.debug(
>                     "Ignoring notification of complete checkpoint {} for 
> not-running task {}",
>                     checkpointId,
>                     taskName);
>         } else if (operatorChain.isFinishedOnRestore()) {
>             LOG.debug(
>                     "Ignoring notification of complete checkpoint {} for 
> finished on restore task {}",
>                     checkpointId,
>                     taskName);
>         } else {
>             LOG.debug(
>                     "Notification of completed checkpoint {} for task {}", 
> checkpointId, taskName);
>             for (StreamOperatorWrapper<?, ?> operatorWrapper :
>                     operatorChain.getAllOperators(true)) {
>                 operatorWrapper.notifyCheckpointComplete(checkpointId);
>             }
>         }
>         env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
>     }
> {code}
> If one operator in the operator chain throws exception out, the following 
> operators and {{TaskStateManager}} would not receive the notification anymore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to