[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282449#comment-17282449 ]
Stephan Ewen commented on FLINK-21133: -------------------------------------- It looks to me like "stop with savepoint" it itself incompletely implemented. The current implementation works by simply "breaking the legacy source thread out of its loop". That's why it only works for those specific sources. I think what stop-with-savepoint should really do is tell the mailbox to take a savepoint and then exit (as if input was empty). That is the definition of stopping and should cover simultaneously chained and unchained sources, without any other changes. "Stop()" is like a graceful "cancel()" where the main difference are: - not interrupting threads, because don't need to be "as fast as possible" - calling close() on the operators for graceful shutdown and finalization. If we add a "stop()" method to the sources, we are solving this in the wrong place, in my opinion. The sources are designed such that the calling thread (mailbox) can decide when to read and when to stop. The sources only give indications about availability to help the thread make that decision. A "stop()" command should go to the thread (mailbox) to tell it to stop its mailbox loop. That way, there is also a natural distinction between the case where the source reaches the end of the stream (and we needs to emit a MAX_WATERMARK, as implemented in the source operator) and the case where we just stop reading ("stop()"). Otherwise we need weird bookkeeping to understand whether, once the source tells us END_OF_INPUT we should advance the watermark or not. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.12.2, 1.13.0 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)