[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938 ]
Guowei Ma commented on FLINK-2646: ---------------------------------- Share some thoughts about who should trigger the last checkpoint in the bounded stream scenario: The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think that it has some advantages that the task triggers the last checkpoint in this scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ should notify the completion of the checkpoint in both two options. # There would be fewer RPC calls ## The first option (_CheckpointCoodinator_ triggers the last checkpoint) ### Task->JM: Task is finished ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint. ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint. ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last checkpoint. ## Second Option (Task triggers the last checkpoint) ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint when it reaches the end of input. ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last checkpoint ### Task -> JM: Task is finished. ## The second option is at least once less RPC call than the first option. Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ sends RPC call to the Finished tasks. If Flink introduces another state such as End_Of_Input to the ExecutionState, there would another more RPC call. # Release the resource is simpler. The _CheckpointCoodinator_ should notify _Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could release the slot properly in the first option. The second option does not need to do anything special for releasing the resource properly. # Easier to deal with the AM failover scenario. Currently, if a job reaches the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So when a new AM grants the leadership it would not re-summitted the Job. We should only remove the JobGraph when the CheckpointCoordinator confirms the notification of last-checkpoint is done in the first option. In the second option, we could do nothing. In the drain scenario, Flink should trigger the checkpoint from the CheckpointCoordiantor. > User functions should be able to differentiate between successful close and > erroneous close > ------------------------------------------------------------------------------------------- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 0.10.0 > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)