[
https://issues.apache.org/jira/browse/FLINK-38370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maximilian Michels reassigned FLINK-38370:
------------------------------------------
Assignee: Maximilian Michels
> CommitterOperator does not checkpoint all pending committables
> --------------------------------------------------------------
>
> Key: FLINK-38370
> URL: https://issues.apache.org/jira/browse/FLINK-38370
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.19.3, 1.20.2, 2.0.1
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.0.1, 1.19.4, 1.20.3
>
>
> What is the issue?
> ==============
> In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending
> committables for batch pipelines anymore, but only for
> current_checkpoint_id+1. This is caused specifically by this line:
> https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id:
> https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
> This value was generally used by other connectors in Flink. This behavior
> was changed in FLINK-37605 / https://github.com/apache/flink/pull/26433.
> Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg
> table won't be updated on shutdown, which means it will not contain any data!
> That results in potential data loss.
> How to fix this issue?
> ================
> For Flink 2.1.0+, Iceberg can generate committables with `checkpoint =
> last_checkpoint_id + 1`. We already took this approach in
> https://github.com/apache/iceberg/pull/13714.
> That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who
> use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a
> surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. We
> should restore the old behavior for the Flink 1.19.4 and 1.20.3 releases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)