Hello,

We are experimenting with implementing position deletes in the Flink
connector to provide better support between components in our stack as
equality deletes are not widely supported yet. I identified an issue which
(potentially) makes this impossible to do correctly in upsert mode and I'd
like to get some feedback if my understanding is correct.

So, let's imagine we create an Iceberg table using Flink with a primary key
and we want to write to this table in upsert mode. If we replace equality
deletes with position deletes I think there is a possibility that we end up
with multiple records with the same primary key. Let's say we have a Flink
job called 'A' which writes to this table in upsert mode and job 'B' which
does a similar thing or just someone manually wants to upsert some data at
the same time. If both jobs receive data with the same primary key very
close to each other in time, then essentially what would happen is that
when we scan the table for rows we want to position delete, neither of the
jobs would see the change from the other job as the commits haven't
happened yet. So if there is an older row, both jobs will try to delete
that row which is fine, however they will also write the new data as well
and now we have 2 rows with the same primary key. As far as I understand
this would not happen with equality deletes because with that we just apply
the equality fields as a filter to previous data files, so in that case we
would have only 1 record with that primary key which is from the job that
did  the commit later.

Please let me know what you think.

Thanks,
Tamas

Reply via email to