[ https://issues.apache.org/jira/browse/FLINK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-23471: ----------------------------- Summary: Try best to ensure all operators and state manager handle the checkpoint notification complete (was: Try best to ensure all operators and state manager handle the checkpoint notification) > Try best to ensure all operators and state manager handle the checkpoint > notification complete > ---------------------------------------------------------------------------------------------- > > Key: FLINK-23471 > URL: https://issues.apache.org/jira/browse/FLINK-23471 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Task > Reporter: Yun Tang > Assignee: Yun Tang > Priority: Major > Fix For: 1.14.0 > > > Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has > implementation below: > {code:java} > @Override > public void notifyCheckpointComplete( > long checkpointId, OperatorChain<?, ?> operatorChain, > Supplier<Boolean> isRunning) > throws Exception { > if (!isRunning.get()) { > LOG.debug( > "Ignoring notification of complete checkpoint {} for > not-running task {}", > checkpointId, > taskName); > } else if (operatorChain.isFinishedOnRestore()) { > LOG.debug( > "Ignoring notification of complete checkpoint {} for > finished on restore task {}", > checkpointId, > taskName); > } else { > LOG.debug( > "Notification of completed checkpoint {} for task {}", > checkpointId, taskName); > for (StreamOperatorWrapper<?, ?> operatorWrapper : > operatorChain.getAllOperators(true)) { > operatorWrapper.notifyCheckpointComplete(checkpointId); > } > } > env.getTaskStateManager().notifyCheckpointComplete(checkpointId); > } > {code} > If one operator in the operator chain throws exception out, the following > operators and {{TaskStateManager}} would not receive the notification anymore. -- This message was sent by Atlassian Jira (v8.3.4#803005)