[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640461#comment-17640461
 ] 

Piotr Nowojski edited comment on FLINK-29856 at 11/29/22 8:31 AM:
------------------------------------------------------------------

Yes, this is the intended behavour.
{quote}
I think the reason why Flink doesn't call notifyCheckpointComplete() anymore on 
"intermediate" savepoints is purely for recovery reasons where we want to 
ensure that we only call notifyCheckpointComplete() once. Arguably, this isn't 
really the case because we can fail directly after restoring from a checkpoint 
but at least we will then only commit the already committed data and not any 
older data, as would be the case when a savepoint had been committing the data 
before we fell back to an earlier checkpoint.
{quote}
The real rationale behind the current behaviour is if you imagine this scenario:
# triggering and completing {{chk42}}
# triggering and completing savepoint 
# job failover

Job can not recover to the savepoint, because there are no guarantees that it 
still exists (savepoint are always owned by the user). It has to failover to 
the {{chk42}}. However if we had committed the savepoint, that would have meant 
recovery to {{chk42}} would create either duplicate results (if the job is 
deterministic) OR inconsistent result (if the job is non-deterministic). 

[~mason6345], think about the savepoints as either state backups of your job, 
or terminal states after stopping the job (also kind of backups that out live 
your jobs). Ideally, neither of those should contain any pointers to an 
external state (like Kafka transactions), but this is not yet implemented 
(FLINK-30070). As it is, during recovery from savepoints it's recommended to 
drop the external state via replacing transactional operators {{uid}}s 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).
 
{quote}
one wants to be safe in those scenarios, we advise dropping the state of 
transactional sinks, by changing sinks uids.
{quote}
Generally speaking recovery to savepoints is always tricky, as in many cases it 
can violate exactly-once guarantees. Users have to take that into account, or 
use only stop-with-savepoint, to make sure that the original job won't be able 
to make any progress while we are doing something with that savepoint. 

{quote}
Without notifyCheckpointComplete, it isn't clear what the benefit of an 
intermediate Savepoint is (need to re-read the FLIP). From the user 
perspective, trigger intermediate Savepoint has the benefit to be able to 
commit data to external systems like Kafka/Iceberg on demand for their 
operational procedures. Perhaps, the eventual solution is to replace 
operational procedure with triggering checkpoint with 
https://issues.apache.org/jira/browse/FLINK-29634 since that would match the 
effects (notifyCheckpointComplete, etc).
{quote}
[~mason6345], can you explain what's the actual problem that you are 
experiencing and why are you using intermediate savepoints? It sounds to me 
like you are using it not the way they were intended. On demand committing data 
to external systems sounds like a use case for FLINK-27101. 


was (Author: pnowojski):
Yes, this is the intended behavour.
{quote}
I think the reason why Flink doesn't call notifyCheckpointComplete() anymore on 
"intermediate" savepoints is purely for recovery reasons where we want to 
ensure that we only call notifyCheckpointComplete() once. Arguably, this isn't 
really the case because we can fail directly after restoring from a checkpoint 
but at least we will then only commit the already committed data and not any 
older data, as would be the case when a savepoint had been committing the data 
before we fell back to an earlier checkpoint.
{quote}
That's not entirely true. Imagine a scenario:
# triggering and completing {{chk42}}
# triggering and completing savepoint 
# job failover

Job can not recover to the savepoint, because there are no guarantees that it 
still exists (savepoint are always owned by the user). It has to failover to 
the {{chk42}}. However if we had committed the savepoint, that would have meant 
recovery to {{chk42}} would create either duplicate results (if the job is 
deterministic) OR inconsistent result (if the job is non-deterministic). 

[~mason6345], think about the savepoints as either state backups of your job, 
or terminal states after stopping the job (also kind of backups that out live 
your jobs). Ideally, neither of those should contain any pointers to an 
external state (like Kafka transactions), but this is not yet implemented 
(FLINK-30070). As it is, during recovery from savepoints it's recommended to 
drop the external state via replacing transactional operators {{uid}}s 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).
 
{quote}
one wants to be safe in those scenarios, we advise dropping the state of 
transactional sinks, by changing sinks uids.
{quote}
Generally speaking recovery to savepoints is always tricky, as in many cases it 
can violate exactly-once guarantees. Users have to take that into account, or 
use only stop-with-savepoint, to make sure that the original job won't be able 
to make any progress while we are doing something with that savepoint. 

{quote}
Without notifyCheckpointComplete, it isn't clear what the benefit of an 
intermediate Savepoint is (need to re-read the FLIP). From the user 
perspective, trigger intermediate Savepoint has the benefit to be able to 
commit data to external systems like Kafka/Iceberg on demand for their 
operational procedures. Perhaps, the eventual solution is to replace 
operational procedure with triggering checkpoint with 
https://issues.apache.org/jira/browse/FLINK-29634 since that would match the 
effects (notifyCheckpointComplete, etc).
{quote}
[~mason6345], can you explain what's the actual problem that you are 
experiencing and why are you using intermediate savepoints? It sounds to me 
like you are using it not the way they were intended. On demand committing data 
to external systems sounds like a use case for FLINK-27101. 

> Triggering savepoint does not trigger operator notifyCheckpointComplete
> -----------------------------------------------------------------------
>
>                 Key: FLINK-29856
>                 URL: https://issues.apache.org/jira/browse/FLINK-29856
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.16.0
>            Reporter: Mason Chen
>            Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke 
> notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output.
> After the ticket was initially filed, I also checked with other stateful UDFs 
> and observed the same behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to