Hi Tamas, You're right about the problem that needs to be solved. This also exists for the same task in Spark. Currently, we ensure correctness by running validations at commit time that detect these cases and then retry or recover.
When committing using `RowDelta`, both jobs would set a conflict detection filter (the portion of the table being modified), starting snapshot, and call `validateNoConflictingDataFiles`. That instructs Iceberg to check whether there are data files that can conflict with the operation being committed. The second process committing would refresh the table and then need to validate that no conflicting data files were added since the starting point in time (which is the same for both jobs). It would then find the data file written by the first committer and detect that it may contain records that were modified in the second commit. That will cause Iceberg to throw an exception that the commit can't proceed and needs to be fixed. After that, you'd need to take some action to recover or retry. You could restart from the last checkpoint, but that discards all of the work you've already done. Another option is to keep track of the IDs that were upserted and write deletes against those IDs for the new data files. That would probably be better than retrying the entire checkpoint. From Iceberg's perspective, recovery choices are left up to the engine. Flink might recover as I described, but engines like Spark just retry the entire write since that's simpler. Ryan On Wed, Mar 8, 2023 at 7:39 AM Tamas Sule <[email protected]> wrote: > Hello, > > We are experimenting with implementing position deletes in the Flink > connector to provide better support between components in our stack as > equality deletes are not widely supported yet. I identified an issue which > (potentially) makes this impossible to do correctly in upsert mode and I'd > like to get some feedback if my understanding is correct. > > So, let's imagine we create an Iceberg table using Flink with a primary > key and we want to write to this table in upsert mode. If we replace > equality deletes with position deletes I think there is a possibility that > we end up with multiple records with the same primary key. Let's say we > have a Flink job called 'A' which writes to this table in upsert mode and > job 'B' which does a similar thing or just someone manually wants to upsert > some data at the same time. If both jobs receive data with the same primary > key very close to each other in time, then essentially what would happen is > that when we scan the table for rows we want to position delete, neither of > the jobs would see the change from the other job as the commits haven't > happened yet. So if there is an older row, both jobs will try to delete > that row which is fine, however they will also write the new data as well > and now we have 2 rows with the same primary key. As far as I understand > this would not happen with equality deletes because with that we just apply > the equality fields as a filter to previous data files, so in that case we > would have only 1 record with that primary key which is from the job that > did the commit later. > > Please let me know what you think. > > Thanks, > Tamas > -- Ryan Blue Tabular
