Hi,

When working on https://github.com/apache/iceberg/pull/13714, Steven
and I noticed changed behavior in Flink 2.1.0 with the
CommiterOperator, which is also present in Flink 1.20.2 and 1.19.3.

I don't see a problem pushing this change to Flink 2.1.0, since it is
a minor release. However, I noticed that 1.19.3 and 1.20.2 are now
subject to data loss with Iceberg.

What is the issue?
==============

In a nutshell, the CommiterOperator (V2 Sink) does not commit all
pending committables for batch pipelines anymore, but only for
current_checkpoint_id+1. This is caused specifically by this line:
https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154

In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id:
https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
This value was generally used by other connectors in Flink. This
behavior was changed in
https://issues.apache.org/jira/browse/FLINK-37605 /
https://github.com/apache/flink/pull/26433.

Since committables using `Long.MAX_VALUE` won't be committed, the
Iceberg table won't be updated on shutdown, which means it will not
contain any data! That results in potential data loss.

How to fix this issue?
================

For Flink 2.1.0+, we can generate committables with `checkpoint =
last_checkpoint_id + 1`. We already took this approach in
https://github.com/apache/iceberg/pull/13714.

That is fine for the upcoming Iceberg version for Flink 2.1.0, but
users who use the current Iceberg version with 1.19.2 or 1.20.1 will
be up for a surprise when they upgrade to the latest patch release
1.19.3 or 1.20.2. They will experience data loss in batch pipelines.
While we may want to keep the other changes in FLINK-37605, I think we
need to include a fix for the Flink 1.19.4 and Flink 1.20.3 release to
ensure we commit all pending committables in batch pipelines.

I'm curious to hear if you think there is another way to resolve this.

Beyond this issue
=============

The overarching question of all of this is, why are we pushing these
types of invasive changes to patch releases? I think the change could
have used another pair of eyes to assess the potential impact.

Another thing which we could have done is to run the Iceberg tests
during release testing, which would have uncovered this bug. I'll
definitely check that for the next release.

Cheers and thanks,
Max

Reply via email to