[ 
https://issues.apache.org/jira/browse/FLINK-38370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021091#comment-18021091
 ] 

Arvid Heise edited comment on FLINK-38370 at 9/18/25 8:57 AM:
--------------------------------------------------------------

Hi [~mxm] , did you read up the respective ticket and understood the 
implications? Basically, there can be multiple final checkpoints and using 
MAX_VALUE breaks any reliable way to correlate them which is important for 
STREAMING. That's also important if parts of your pipeline finish before other 
parts of the pipeline. Because closed operators also receive the final 
checkpoint.
I suggest to rather backport your adjustments to the old iceberg releases and 
release bugfix releases.

Sorry for the breaking changes but we basically dealt with data loss scenarios 
in STREAMING. We apparently have a choice here to break STREAMING in edge cases 
or BATCH in general but you already have a fix for the latter.


was (Author: arvid):
Hi [~mxm] , did you read up the respective ticket and understood the 
implications? Basically, there are multiple final checkpoints and using 
MAX_VALUE breaks any reliable way to correlate that which is important for 
final checkpoints in STREAMING. That's also important if parts of your pipeline 
finish before other parts of the pipeline.
I suggest to rather backport your adjustments to the old iceberg releases.

> CommitterOperator does not checkpoint all pending committables
> --------------------------------------------------------------
>
>                 Key: FLINK-38370
>                 URL: https://issues.apache.org/jira/browse/FLINK-38370
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.19.3, 1.20.2, 2.0.1
>            Reporter: Maximilian Michels
>            Priority: Major
>             Fix For: 2.0.1, 1.19.4, 1.20.3
>
>
> What is the issue?
> ==============
> In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending 
> committables for batch pipelines anymore, but only for 
> current_checkpoint_id+1. This is caused specifically by this line: 
> https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id: 
> https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
>  This value was generally used by other connectors in Flink. This behavior 
> was changed in FLINK-37605 / https://github.com/apache/flink/pull/26433.
> Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg 
> table won't be updated on shutdown, which means it will not contain any data! 
> That results in potential data loss.
> How to fix this issue?
> ================
> For Flink 2.1.0+, Iceberg can generate committables with `checkpoint = 
> last_checkpoint_id + 1`. We already took this approach in 
> https://github.com/apache/iceberg/pull/13714.
> That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who 
> use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a 
> surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. We 
> should restore the old behavior for the Flink 1.19.4 and 1.20.3 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to