[ https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689624#comment-17689624 ]
Fabian Paul commented on FLINK-30238: ------------------------------------- Sorry for joining late I was on PTO. [~gaoyunhaii] # I think your analysis is not fully correct. endOfInput() is also called on stop-with-savepoint, drain only advances the max watermark. So the special marker \{checkpoint id = MAX_VALUE} is emitted from the SinkWriters and stored in the Committer state (only committables with lower checkpoint ids are committed forwarded). The marker is initialized again if a new pipeline tries to recover from that state but is never committed. The problem starts if you try to take another savepoint because the committer will receive the marker again, and currently, we do not allow two summaries with the same checkpoint id. # The problem here is more about a possible migration story. So far, when users use the sink and want to close all pending transactions, we advise stopping the job with a savepoint that should finalize all open transactions (- some windows if drain is not used). In the case of sinks with a post-commit topology, that doesn't entirely work because it essentially needs two notifyCheckpointCompletes first to flush the committer and then the, for example, global committer. We should concentrate first on the first issue because it atm blocks all sinks running on Flink 1.15+, and figure out how to improve the situation for the second one later. > Unified Sink committer does not clean up state on final savepoint > ----------------------------------------------------------------- > > Key: FLINK-30238 > URL: https://issues.apache.org/jira/browse/FLINK-30238 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.17.0, 1.15.3, 1.16.1 > Reporter: Fabian Paul > Priority: Critical > > During stop-with-savepoint the committer only commits the pending > committables on notifyCheckpointComplete. > This has several downsides. > * Last committableSummary has checkpoint id LONG.MAX and is never cleared > from the state leading to that stop-with-savepoint does not work when the > pipeline recovers from a savepoint > * While the committables are committed during stop-with-savepoint they are > not forwarded to post-commit topology, potentially losing data and preventing > to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)