[
https://issues.apache.org/jira/browse/FLINK-38370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021133#comment-18021133
]
Maximilian Michels commented on FLINK-38370:
--------------------------------------------
[~arvid]
{quote}Hi Maximilian Michels , did you read up the respective ticket and
understood the implications?
{quote}
Yes! I think the change, all in all, makes a lot of sense. Please let me know
if I misunderstood anything.
{quote}Basically, there can be multiple final checkpoints and using MAX_VALUE
breaks any reliable way to correlate them which is important for STREAMING.
{quote}
Maybe I didn't make it clear, but this is a batch-only issue. STREAMING does
not have this issue. The new logic makes perfect sense for streaming.
{quote}That's also important if parts of your pipeline finish before other
parts of the pipeline. Because closed operators also receive the final
checkpoint.
{quote}
That's fine for streaming, but there will be only one "checkpoint" in batch
when we call the finish() method. After the change in FLINK-37605, when a user
upgrades to the latest patch version, they will suddenly not have any data
committed by the V2 sink framework for batch pipelines. That does not seem
right.
{quote}I suggest to rather backport your adjustments to the old iceberg
releases and release bugfix releases.
{quote}
Frankly, I don't think that is an option. How would we even detect which Flink
version has which behavior? I'm fine with the change in minor versions, but
patch releases must retain the current behavior. Again, I'm not against the
general approach, but we must fix the issue described in this JIRA, which
affects batch pipelines only.
{quote}Sorry for the breaking changes but we basically dealt with data loss
scenarios in STREAMING. We apparently have a choice here to break STREAMING in
edge cases or BATCH in general but you already have a fix for the latter.
{quote}
The fix in FLINK-37605 is ok and I understand why it was necessary, but it
introduces a new issue which needs to be fixed for the next patch release.
[~sxnan]
{quote}Hi Maximilian Michels, I am preparing the 1.20.3 release in a few days.
Seems like you are proposing to restore the old behavior for Flink 1.20.3. If
it can be fixed in a day or two, we can wait for the fix.
{quote}
That would be great, I think I can provide a fix. [~arvid] can probably help
reviewing it.
> CommitterOperator does not checkpoint all pending committables
> --------------------------------------------------------------
>
> Key: FLINK-38370
> URL: https://issues.apache.org/jira/browse/FLINK-38370
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.19.3, 1.20.2, 2.0.1
> Reporter: Maximilian Michels
> Priority: Major
> Fix For: 2.0.1, 1.19.4, 1.20.3
>
>
> What is the issue?
> ==============
> In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending
> committables for batch pipelines anymore, but only for
> current_checkpoint_id+1. This is caused specifically by this line:
> https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id:
> https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
> This value was generally used by other connectors in Flink. This behavior
> was changed in FLINK-37605 / https://github.com/apache/flink/pull/26433.
> Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg
> table won't be updated on shutdown, which means it will not contain any data!
> That results in potential data loss.
> How to fix this issue?
> ================
> For Flink 2.1.0+, Iceberg can generate committables with `checkpoint =
> last_checkpoint_id + 1`. We already took this approach in
> https://github.com/apache/iceberg/pull/13714.
> That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who
> use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a
> surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. We
> should restore the old behavior for the Flink 1.19.4 and 1.20.3 releases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)