[ 
https://issues.apache.org/jira/browse/FLINK-38370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021091#comment-18021091
 ] 

Arvid Heise commented on FLINK-38370:
-------------------------------------

Hi [~mxm] , did you read up the respective ticket and understood the 
implications? Basically, there are multiple final checkpoints and using 
MAX_VALUE breaks any reliable way to correlate that which is important for 
final checkpoints in STREAMING. That's also important if parts of your pipeline 
finish before other parts of the pipeline.
I suggest to rather backport your adjustments to the old iceberg releases.

> CommitterOperator does not checkpoint all pending committables
> --------------------------------------------------------------
>
>                 Key: FLINK-38370
>                 URL: https://issues.apache.org/jira/browse/FLINK-38370
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.19.3, 1.20.2, 2.0.1
>            Reporter: Maximilian Michels
>            Priority: Major
>             Fix For: 2.0.1, 1.19.4, 1.20.3
>
>
> What is the issue?
> ==============
> In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending 
> committables for batch pipelines anymore, but only for 
> current_checkpoint_id+1. This is caused specifically by this line: 
> https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id: 
> https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
>  This value was generally used by other connectors in Flink. This behavior 
> was changed in FLINK-37605 / https://github.com/apache/flink/pull/26433.
> Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg 
> table won't be updated on shutdown, which means it will not contain any data! 
> That results in potential data loss.
> How to fix this issue?
> ================
> For Flink 2.1.0+, Iceberg can generate committables with `checkpoint = 
> last_checkpoint_id + 1`. We already took this approach in 
> https://github.com/apache/iceberg/pull/13714.
> That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who 
> use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a 
> surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. We 
> should restore the old behavior for the Flink 1.19.4 and 1.20.3 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to