Hi, When working on https://github.com/apache/iceberg/pull/13714, Steven and I noticed changed behavior in Flink 2.1.0 with the CommiterOperator, which is also present in Flink 1.20.2 and 1.19.3.
I don't see a problem pushing this change to Flink 2.1.0, since it is a minor release. However, I noticed that 1.19.3 and 1.20.2 are now subject to data loss with Iceberg. What is the issue? ============== In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending committables for batch pipelines anymore, but only for current_checkpoint_id+1. This is caused specifically by this line: https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154 In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id: https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79. This value was generally used by other connectors in Flink. This behavior was changed in https://issues.apache.org/jira/browse/FLINK-37605 / https://github.com/apache/flink/pull/26433. Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg table won't be updated on shutdown, which means it will not contain any data! That results in potential data loss. How to fix this issue? ================ For Flink 2.1.0+, we can generate committables with `checkpoint = last_checkpoint_id + 1`. We already took this approach in https://github.com/apache/iceberg/pull/13714. That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. They will experience data loss in batch pipelines. While we may want to keep the other changes in FLINK-37605, I think we need to include a fix for the Flink 1.19.4 and Flink 1.20.3 release to ensure we commit all pending committables in batch pipelines. I'm curious to hear if you think there is another way to resolve this. Beyond this issue ============= The overarching question of all of this is, why are we pushing these types of invasive changes to patch releases? I think the change could have used another pair of eyes to assess the potential impact. Another thing which we could have done is to run the Iceberg tests during release testing, which would have uncovered this bug. I'll definitely check that for the next release. Cheers and thanks, Max
