I thought about this some more. One of the important parts of the
Iceberg sink is to know whether we have already committed some
DataFiles. Currently, this is implemented by writing a (JobId,
MaxCheckpointId) tuple to the Iceberg table when committing. When
restoring from a failure we check this and discard committables
(DataFile) that we know to already be committed.
I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.
I think we could fix this by having Flink provide a nonce to the
GlobalCommitter where Flink guarantees that this nonce is unique and
will not change for repeated invocations of the GlobalCommitter with the
same set of committables. The GlobalCommitter could use this to
determine whether a set of committables has already been committed to
the Iceberg table.
It's seems very tailor-made for Iceberg for now but other systems should
suffer from the same problem.
Best,
Aljoscha