I thought about this some more. One of the important parts of the Iceberg sink is to know whether we have already committed some DataFiles. Currently, this is implemented by writing a (JobId, MaxCheckpointId) tuple to the Iceberg table when committing. When restoring from a failure we check this and discard committables (DataFile) that we know to already be committed.

I think this can have some problems, for example when checkpoint ids are not strictly sequential, when we wrap around, or when the JobID changes. This will happen when doing a stop/start-from-savepoint cycle, for example.

I think we could fix this by having Flink provide a nonce to the GlobalCommitter where Flink guarantees that this nonce is unique and will not change for repeated invocations of the GlobalCommitter with the same set of committables. The GlobalCommitter could use this to determine whether a set of committables has already been committed to the Iceberg table.

It's seems very tailor-made for Iceberg for now but other systems should suffer from the same problem.

Best,
Aljoscha

Reply via email to