Hello, We are experimenting with implementing position deletes in the Flink connector to provide better support between components in our stack as equality deletes are not widely supported yet. I identified an issue which (potentially) makes this impossible to do correctly in upsert mode and I'd like to get some feedback if my understanding is correct.
So, let's imagine we create an Iceberg table using Flink with a primary key and we want to write to this table in upsert mode. If we replace equality deletes with position deletes I think there is a possibility that we end up with multiple records with the same primary key. Let's say we have a Flink job called 'A' which writes to this table in upsert mode and job 'B' which does a similar thing or just someone manually wants to upsert some data at the same time. If both jobs receive data with the same primary key very close to each other in time, then essentially what would happen is that when we scan the table for rows we want to position delete, neither of the jobs would see the change from the other job as the commits haven't happened yet. So if there is an older row, both jobs will try to delete that row which is fine, however they will also write the new data as well and now we have 2 rows with the same primary key. As far as I understand this would not happen with equality deletes because with that we just apply the equality fields as a filter to previous data files, so in that case we would have only 1 record with that primary key which is from the job that did the commit later. Please let me know what you think. Thanks, Tamas
