[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938
 ] 

Guowei Ma commented on FLINK-2646:
----------------------------------

Share some thoughts about who should trigger the last checkpoint in the bounded 
stream scenario:

The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think 
that it has some advantages that the task triggers the last checkpoint in this 
scenario. Of course, it is the responsibility that the _CheckpointCoordinator_ 
should notify the completion of the checkpoint in both two options.
 # There would be fewer RPC calls
 ## The first option (_CheckpointCoodinator_ triggers the last checkpoint)
 ### Task->JM: Task is finished
 ### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint.
 ### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint.
 ### _CheckpointCoodinator_ -> Task: Confirm the completion of the last 
checkpoint.
 ##  Second Option (Task triggers the last checkpoint)
 ### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint 
when it reaches the end of input.
 ### _CheckpointCoordinator_ -> Task: Confirm the completion of the last 
checkpoint
 ### Task -> JM: Task is finished.
 ##  The second option is at least once less RPC call than the first option. 
Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_ 
sends RPC call to the Finished tasks. If Flink introduces another state such as 
End_Of_Input to the ExecutionState, there would another more RPC call.
 #  Release the resource is simpler. The _CheckpointCoodinator_ should notify 
_Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could 
release the slot properly in the first option. The second option does not need 
to do anything special for releasing the resource properly.
 # Easier to deal with the AM failover scenario. Currently, if a job reaches 
the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So 
when a new AM grants the leadership it would not re-summitted the Job. We 
should only remove the JobGraph when the CheckpointCoordinator confirms the 
notification of last-checkpoint is done in the first option. In the second 
option, we could do nothing.

 In the drain scenario, Flink should trigger the checkpoint from the 
CheckpointCoordiantor.

> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to