[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640461#comment-17640461 ]
Piotr Nowojski edited comment on FLINK-29856 at 11/29/22 8:29 AM: ------------------------------------------------------------------ Yes, this is the intended behavour. {quote} I think the reason why Flink doesn't call notifyCheckpointComplete() anymore on "intermediate" savepoints is purely for recovery reasons where we want to ensure that we only call notifyCheckpointComplete() once. Arguably, this isn't really the case because we can fail directly after restoring from a checkpoint but at least we will then only commit the already committed data and not any older data, as would be the case when a savepoint had been committing the data before we fell back to an earlier checkpoint. {quote} That's not entirely true. Imagine a scenario: # triggering and completing {{chk42}} # triggering and completing savepoint # job failover Job can not recover to the savepoint, because there are no guarantees that it still exists (savepoint are always owned by the user). It has to failover to the {{chk42}}. However if we had committed the savepoint, that would have meant recovery to {{chk42}} would create either duplicate results (if the job is deterministic) OR inconsistent result (if the job is non-deterministic). [~mason6345], think about the savepoints as either state backups of your job, or terminal states after stopping the job (also kind of backups that out live your jobs). Ideally, neither of those should contain any pointers to an external state (like Kafka transactions), but this is not yet implemented (FLINK-30070). As it is, during recovery from savepoints it's recommended to drop the external state via replacing transactional operators {{uid}}s (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints). {quote} one wants to be safe in those scenarios, we advise dropping the state of transactional sinks, by changing sinks uids. {quote} Generally speaking recovery to savepoints is always tricky, as in many cases it can violate exactly-once guarantees. Users have to take that into account, or use only stop-with-savepoint, to make sure that the original job won't be able to make any progress while we are doing something with that savepoint. {quote} Without notifyCheckpointComplete, it isn't clear what the benefit of an intermediate Savepoint is (need to re-read the FLIP). From the user perspective, trigger intermediate Savepoint has the benefit to be able to commit data to external systems like Kafka/Iceberg on demand for their operational procedures. Perhaps, the eventual solution is to replace operational procedure with triggering checkpoint with https://issues.apache.org/jira/browse/FLINK-29634 since that would match the effects (notifyCheckpointComplete, etc). {quote} [~mason6345], can you explain what's the actual problem that you are experiencing and why are you using intermediate savepoints? It sounds to me like you are using it not the way they were intended. On demand committing data to external systems sounds like a use case for FLINK-27101. was (Author: pnowojski): Yes, this is the intended behavour. {quote} I think the reason why Flink doesn't call notifyCheckpointComplete() anymore on "intermediate" savepoints is purely for recovery reasons where we want to ensure that we only call notifyCheckpointComplete() once. Arguably, this isn't really the case because we can fail directly after restoring from a checkpoint but at least we will then only commit the already committed data and not any older data, as would be the case when a savepoint had been committing the data before we fell back to an earlier checkpoint. {quote} That's not entirely true. Imagine a scenario: # triggering and completing {{chk42}} # triggering and completing savepoint # job failover Job can not recover to the savepoint, because there are no guarantees that it still exists (savepoint are always owned by the user). It has to failover to the {{chk42}}. However if we had committed the savepoint, that would have meant recovery to {{chk42}} would create either duplicate results (if the job is deterministic) OR inconsistent result (if the job is non-deterministic). [~mason6345], think about the savepoints as either state backups of your job, or terminal states after stopping the job (also kind of backups that out live your jobs). Ideally, neither of those should contain any pointers to an external state (like Kafka transactions), but this is not yet implemented (FLINK-30070). As it is, during recovery from savepoints it's recommended to drop the external state via replacing transactional operators {{uid}}s (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints). {{quote}} one wants to be safe in those scenarios, we advise dropping the state of transactional sinks, by changing sinks uids. {{quote}} Generally speaking recovery to savepoints is always tricky, as in many cases it can violate exactly-once guarantees. Users have to take that into account, or use only stop-with-savepoint, to make sure that the original job won't be able to make any progress while we are doing something with that savepoint. {{quote}} Without notifyCheckpointComplete, it isn't clear what the benefit of an intermediate Savepoint is (need to re-read the FLIP). From the user perspective, trigger intermediate Savepoint has the benefit to be able to commit data to external systems like Kafka/Iceberg on demand for their operational procedures. Perhaps, the eventual solution is to replace operational procedure with triggering checkpoint with https://issues.apache.org/jira/browse/FLINK-29634 since that would match the effects (notifyCheckpointComplete, etc). {{quote}} [~mason6345], can you explain what's the actual problem that you are experiencing and why are you using intermediate savepoints? It sounds to me like you are using it not the way they were intended. On demand committing data to external systems sounds like a use case for FLINK-27101. > Triggering savepoint does not trigger operator notifyCheckpointComplete > ----------------------------------------------------------------------- > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Affects Versions: 1.15.0, 1.16.0 > Reporter: Mason Chen > Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke > notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. > After the ticket was initially filed, I also checked with other stateful UDFs > and observed the same behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)