[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282449#comment-17282449
 ] 

Stephan Ewen commented on FLINK-21133:
--------------------------------------

It looks to me like "stop with savepoint" it itself incompletely implemented. 
The current implementation works by simply "breaking the legacy source thread 
out of its loop". That's why it only works for those specific sources.

I think what stop-with-savepoint should really do is tell the mailbox to take a 
savepoint and then exit (as if input was empty).
That is the definition of stopping and should cover simultaneously chained and 
unchained sources, without any other changes.
"Stop()" is like a graceful "cancel()" where the main difference are:
  - not interrupting threads, because don't need to be "as fast as possible"
  - calling close() on the operators for graceful shutdown and finalization.

If we add a "stop()" method to the sources, we are solving this in the wrong 
place, in my opinion. The sources are designed such that the calling thread 
(mailbox) can decide when to read and when to stop. The sources only give 
indications about availability to help the thread make that decision. A 
"stop()" command should go to the thread (mailbox) to tell it to stop its 
mailbox loop.

That way, there is also a natural distinction between the case where the source 
reaches the end of the stream (and we needs to emit a MAX_WATERMARK, as 
implemented in the source operator) and the case where we just stop reading 
("stop()"). Otherwise we need weird bookkeeping to understand whether, once the 
source tells us END_OF_INPUT we should advance the watermark or not.





> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to