[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022873#comment-17022873
 ] 

Guowei Ma commented on FLINK-2646:
----------------------------------

Thanks for your detailed comments.  I think it is very cool unification the 
sink for the bounded and unbounded job.

What I understand about the Sinks and Committing as follow, correct me if I am 
wrong
 # EndOfInput(TaskFinish) is very similar to trigger a checkpoint. One 
difference is this checkpoint is not triggered by the _CheckpointCoordinator_. 
So maybe Flink could notify the UDF to snapshot state when receiving the 
_EndOfInput_
 # JobFinish is very similar to the checkpoint complete. So maybe Flink could 
also notify the UDF the _notifyCheckpointComplete_ when a job is finished.

So the sink does not need to assume that the input is bounded or unbounded. It 
only depends on the checkpoint mechanism to achieve exactly-once on its side.

 

I have some little questions and thoughts. I want to be on the same page with 
you guys through thinking about these problems.
 # When does the Flink notify the task CheckpointComplete if a job has both 
bounded and unbounded source? Because the job could not finish the finished 
tasks of the job could not be notified of the _JobFinished_ An option is that 
Flink needs to support triggering the checkpoint for a job that has the 
finished tasks and notifying the completion of the checkpoint.
 # When a slot could be released for the other task to use? If I understand 
correctly all the resources(included managed memory) should be released in the 
_dispose_ stage in the new design. So a task could not release any resource 
even after the task reports it is finished to JM if it needs to be notified of 
the _JobFinish_ As far as I know the JM could release slot when all the tasks 
in it are finished. This might lead to inconsistency. I am not pretty sure 
there are some specific cases for this. But I think it might be some potential 
risks in theory.
 # Flink needs to guarantee that the JobFinish event is received by all the 
tasks. Flink could not receive the acknowledgment of the JobFinish event from 
the task. There could be two situations. (The drain might have the same claim 
in some situations.)
 ## JobFinshed request/response is lost. Retrying JobFinished notification 
might resolve this problem.
 ## The task failed when handling the JobFinished event. So Flink could not 
receive the acknowledge. Flink could use the normal Failover Strategy and 
restart the task with the state that is snapshotted at the moment of the 
_endOfInput_. This could trigger another round _endOfInput_ and _JobFinished_. 
But I think this only works for the source that supports the checkpoint. (JM 
failover when notifying the JobFinish Event to the task. The new JM should 
notify the JobFinshed evet to all the tasks.)

> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to