[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287105#comment-17287105
 ] 

Piotr Nowojski commented on FLINK-21133:
----------------------------------------

[~becket_qin] I think you are missing the *Stop-with-Savepoint, with pipeline 
draining* case, which [~sewen] mentioned. There, we are emitting 
{{MAX_WATERMARK}} with {{advanceToEndOfEventTime}} case. In that case we 
probably should follow the standard clean shutdown procedure. I don't know if 
we should invoke {{endOfInput}} on the operators in that case or not... I would 
guess probably yes. If we are flushing window operators, we would also want to 
flush all buffered records. And if we are never intending to resume this job, 
{{endOfInput}} makes kind of sense for the downstream operators?

[~sewen], generally speaking I like the idea of changing stop with savepoint 
(without the drain), to cancel with savepoint. As me and [~roman_khachatryan] 
mentioned previously, we would like to avoid controlling the flow with 
exceptions. But that should be as easy to replace throwing 
`CancelTaskException` with just `StreamTask#cancel` call. However I still do 
not see how one would solve the problem of fully/completely backpressured 
legacy source task.

Secondly [~sewen] as we discussed offline. We would have to make sure that 
downstream/upstream task would cancel correctly, without mis-leading error 
messages, if they receive network connection closed before processing 
{{notifyCheckpointComplete()}}. I haven't looked how this would affect the 
code, maybe that's not an issue and it would work as it is, but it has to be 
verified.


> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to