[ 
https://issues.apache.org/jira/browse/FLINK-38370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021133#comment-18021133
 ] 

Maximilian Michels commented on FLINK-38370:
--------------------------------------------

[~arvid]
{quote}Hi Maximilian Michels , did you read up the respective ticket and 
understood the implications?
{quote}
Yes! I think the change, all in all, makes a lot of sense. Please let me know 
if I misunderstood anything.
{quote}Basically, there can be multiple final checkpoints and using MAX_VALUE 
breaks any reliable way to correlate them which is important for STREAMING.
{quote}
Maybe I didn't make it clear, but this is a batch-only issue. STREAMING does 
not have this issue. The new logic makes perfect sense for streaming. 
{quote}That's also important if parts of your pipeline finish before other 
parts of the pipeline. Because closed operators also receive the final 
checkpoint.
{quote}
That's fine for streaming, but there will be only one "checkpoint" in batch 
when we call the finish() method. After the change in FLINK-37605, when a user 
upgrades to the latest patch version, they will suddenly not have any data 
committed by the V2 sink framework for batch pipelines. That does not seem 
right.
{quote}I suggest to rather backport your adjustments to the old iceberg 
releases and release bugfix releases.
{quote}
Frankly, I don't think that is an option. How would we even detect which Flink 
version has which behavior? I'm fine with the change in minor versions, but 
patch releases must retain the current behavior. Again, I'm not against the 
general approach, but we must fix the issue described in this JIRA, which 
affects batch pipelines only.
{quote}Sorry for the breaking changes but we basically dealt with data loss 
scenarios in STREAMING. We apparently have a choice here to break STREAMING in 
edge cases or BATCH in general but you already have a fix for the latter.
{quote}
The fix in FLINK-37605 is ok and I understand why it was necessary, but it 
introduces a new issue which needs to be fixed for the next patch release.

[~sxnan]
{quote}Hi Maximilian Michels, I am preparing the 1.20.3 release in a few days. 
Seems like you are proposing to restore the old behavior for Flink 1.20.3. If 
it can be fixed in a day or two, we can wait for the fix.
{quote}
That would be great, I think I can provide a fix. [~arvid] can probably help 
reviewing it.

> CommitterOperator does not checkpoint all pending committables
> --------------------------------------------------------------
>
>                 Key: FLINK-38370
>                 URL: https://issues.apache.org/jira/browse/FLINK-38370
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.19.3, 1.20.2, 2.0.1
>            Reporter: Maximilian Michels
>            Priority: Major
>             Fix For: 2.0.1, 1.19.4, 1.20.3
>
>
> What is the issue?
> ==============
> In a nutshell, the CommiterOperator (V2 Sink) does not commit all pending 
> committables for batch pipelines anymore, but only for 
> current_checkpoint_id+1. This is caused specifically by this line: 
> https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id: 
> https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
>  This value was generally used by other connectors in Flink. This behavior 
> was changed in FLINK-37605 / https://github.com/apache/flink/pull/26433.
> Since committables using `Long.MAX_VALUE` won't be committed, the Iceberg 
> table won't be updated on shutdown, which means it will not contain any data! 
> That results in potential data loss.
> How to fix this issue?
> ================
> For Flink 2.1.0+, Iceberg can generate committables with `checkpoint = 
> last_checkpoint_id + 1`. We already took this approach in 
> https://github.com/apache/iceberg/pull/13714.
> That is fine for the upcoming Iceberg version for Flink 2.1.0, but users who 
> use the current Iceberg version with 1.19.2 or 1.20.1 will be up for a 
> surprise when they upgrade to the latest patch release 1.19.3 or 1.20.2. We 
> should restore the old behavior for the Flink 1.19.4 and 1.20.3 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to