[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638394#comment-17638394
 ] 

Mason Chen edited comment on FLINK-29856 at 11/24/22 6:44 PM:
--------------------------------------------------------------

[~masteryhx] [~mxm] [~gaoyunhaii] I double checked that snapshotState 
*{*}is{*}* called, but notifyCheckpointComplete *{*}is not{*}*–this is 
observable from FLIP 27 sources and custom stateful UDFs. It looks like an 
intended design decision like Max mentioned.

> i.e. calling notifyCheckpointComplete() again for already processed 
> records/state.

Yes, I noticed this odd side effect when I tried to fix this issue, but it 
could be alleviated by adding the check for if the job is restoring or not.

---

Without notifyCheckpointComplete, it isn't clear what the benefit of an 
intermediate Savepoint is (need to re-read the FLIP). From the user 
perspective, trigger intermediate Savepoint has the benefit to be able to 
commit data to external systems like Kafka/Iceberg on demand for their 
operational procedures. Perhaps, the eventual solution is to replace 
operational procedure with triggering checkpoint with 
https://issues.apache.org/jira/browse/FLINK-29634 since that would match the 
effects (notifyCheckpointComplete, etc).

---

For the purposes of being backward compatible, does it make sense to enable the 
notifyCheckpointComplete behavior in intermediate savepoints for 1.16 and 
remove it when https://issues.apache.org/jira/browse/FLINK-29634 is released in 
1.17?


was (Author: mason6345):
[~masteryhx] [~mxm] [~gaoyunhaii] I double checked that snapshotState **is** 
called, but notifyCheckpointComplete **is not**–this is observable from FLIP 27 
sources and custom stateful UDFs. It looks like an intended design decision 
like Max mentioned.

> i.e. calling notifyCheckpointComplete() again for already processed 
> records/state.

Yes, I noticed this odd side effect when I tried to fix this issue, but it 
could be alleviated by adding the check for if the job is restoring or not.

---

Without notifyCheckpointComplete, it isn't clear what the benefit of an 
intermediate Savepoint is (need to re-read the FLIP). From the user 
perspective, it was benefit to be able to commit data to external systems like 
Kafka/Iceberg on demand for their operational procedures. Perhaps, the eventual 
solution is to replace operational procedure with triggering checkpoint with 
https://issues.apache.org/jira/browse/FLINK-29634 since that would match the 
effects (notifyCheckpointComplete, etc).

> Triggering savepoint does not trigger operator notifyCheckpointComplete
> -----------------------------------------------------------------------
>
>                 Key: FLINK-29856
>                 URL: https://issues.apache.org/jira/browse/FLINK-29856
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.0
>            Reporter: Mason Chen
>            Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke 
> notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output.
> After the ticket was initially filed, I also checked with other stateful UDFs 
> and observed the same behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to