[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689624#comment-17689624
 ] 

Fabian Paul commented on FLINK-30238:
-------------------------------------

Sorry for joining late I was on PTO.

[~gaoyunhaii] 
 # I think your analysis is not fully correct. endOfInput() is also called on 
stop-with-savepoint, drain only advances the max watermark. So the special 
marker \{checkpoint id = MAX_VALUE} is emitted from the SinkWriters and stored 
in the Committer state (only committables with lower checkpoint ids are 
committed forwarded). The marker is initialized again if a new pipeline tries 
to recover from that state but is never committed. The problem starts if you 
try to take another savepoint because the committer will receive the marker 
again, and currently, we do not allow two summaries with the same checkpoint id.
 # The problem here is more about a possible migration story. So far, when 
users use the sink and want to close all pending transactions, we advise 
stopping the job with a savepoint that should finalize all open transactions (- 
some windows if drain is not used). In the case of sinks with a post-commit 
topology, that doesn't entirely work because it essentially needs two 
notifyCheckpointCompletes first to flush the committer and then the, for 
example, global committer.

 

We should concentrate first on the first issue because it atm blocks all sinks 
running on Flink 1.15+, and figure out how to improve the situation for the 
second one later.

> Unified Sink committer does not clean up state on final savepoint
> -----------------------------------------------------------------
>
>                 Key: FLINK-30238
>                 URL: https://issues.apache.org/jira/browse/FLINK-30238
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.17.0, 1.15.3, 1.16.1
>            Reporter: Fabian Paul
>            Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to