[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288340#comment-17288340
 ] 

Till Rohrmann commented on FLINK-21133:
---------------------------------------

Thanks for driving this discussion. I think a lot of things are coming together 
here because shutdown/closing semantics of operators are tightly coupled to how 
sources and sinks must work and it should ideally work for bounded and 
unbounded streams similarly.

I think it is a good idea to think about what functionality we want to offer to 
our users as [~becket_qin] did it before diving into the concrete 
implementation details:

1. *A user wants to cancel a bounded/unbounded job and does not care about the 
output correctness*
This is what we currently support with {{Task.cancelExecution}}. Cancel should 
forcefully terminate the operators and we don't care whether records are 
properly sent to downstream tasks or written to a sink.

2. *A user wants to suspend a bounded/unbounded job and wants to be able to 
resume it later*
This is what we currently call **stop-with-savepoint**. Here the user wants to 
create a savepoint representing the current state of the computation which he 
can use to resume the job at a later point. The user does not care about 
whether the job shuts down orderly or not. Hence, we should be able to simply 
cancel all tasks after taking the savepoint.

3. *A user wants to terminate a bounded/unbounded job and wants all of its 
buffered data to be flushed to external systems*
This is what we currently call **stop-with-savepoint --drain**. Here the user 
is not so much interested in the savepoint as in materializing the job's 
state/result to external systems. The only use case for the created savepoint 
here I could think of is to get access to Flink's state after the job has 
terminated. Resuming the job from such a savepoint does not make much sense 
because the results will be affected by the {{MAX_WATERMARK}} we have sent. 
Maybe **stop-with-savepoint --drain** is a misnomer.

4. *A bounded job reaches its end and in order to guarantee correctness needs 
to flush als its buffered data*
This effectively the same as terminating a bounded/unbounded job just that it 
is not induced by the user but by the sources reaching the end of data.

5. *A user wants to gracefully stop its job w/o creating a savepoint*
The only case I can think of is that a user wants to stop an at-least-once job 
in such a way that all records up to the point of issuing the command will get 
processed. Other than that cancel might already be good enough.

Conceptually, 3. and 4. should use the same mechanism because semantically, 
there is no difference [~gaoyunhaii].

I agree with [~kezhuw] and [~sewen] that 2. should not require the complexity 
of shutting the job gracefully down. E.g. by allowing the state transition 
{{RUNNING -> CANCELLED}} on the {{JobMaster}} could allow the {{Tasks}} to 
simply cancel themselves after receiving {{notifyCheckpointComplete}}. 
Alternatively, {{Tasks}} could acknowledge the {{notifyCheckpointComplete}} but 
this adds complexity to the already complex {{CheckpointCoordinator}}.

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to