This sounds like a reasonable solution to me. Thanks, Peter!

On Thu, Jul 18, 2024 at 8:28 AM Steven Wu <stevenz...@gmail.com> wrote:

> Regarding unaligned checkpoints, Flink savepoint is always aligned and
> recommended for Flink version upgrade. We can potentially recommend users
> to use Flink savepoint to pick up this fix.
>
> I will take a closer look at the PR.
>
> On Thu, Jul 18, 2024 at 6:22 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> Qishang Zhong found a bug with Flink Sink [1].
>>
>> In nutshell:
>> - Failure during checkpoints could cause duplicate data with Flink sink
>> with CDC writes
>>
>> In more detail:
>> - If there is a failure after the `prepareSnapshotPreBarrier` but before
>> the `snapshotState` for CHK1, then the data/delete files are created, but
>> they are not assigned to the snapshot. So when the Flink job restarts, the
>> `IcebergFilesCommitter` will receive the data/delete files for the CHK1,
>> along with the new snapshot (CHK2), and it will commit them in a single
>> Iceberg commit (S1).
>>
>> If there is an equality delete record in CHK2 which should delete a
>> record created by CHK1, then the table becomes corrupted (the record is
>> duplicated) since both the data file for CHK1, and the equality delete file
>> for CHK2 are committed in S1, so the delete will not be applied to the data
>> file.
>>
>> To fix this, we need to assign the checkpointId to the `WriteResult`
>> immediately after creation at the `IcebergStreamWriter` side and send the
>> data along with the `WriteResult`. Since the `WriteResult` is part of the
>> Iceberg API, we could not change that, we need to wrap it to a new object,
>> and use that for the communication between the writer and the committer.
>> This would break upgrading Flink Jobs which are using unaligned checkpoints
>> [2].
>>
>> I would like to understand how widely unaligned checkpointing is used in
>> the community and I propose to accept this fix if there are no objections.
>>
>> Thanks,
>> Peter
>>
>> [1] - https://github.com/apache/iceberg/pull/10526
>> [2] -
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints
>>
>

-- 
Ryan Blue
Databricks

Reply via email to