Hi Tamas,

You're right about the problem that needs to be solved. This also exists
for the same task in Spark. Currently, we ensure correctness by running
validations at commit time that detect these cases and then retry or
recover.

When committing using `RowDelta`, both jobs would set a conflict detection
filter (the portion of the table being modified), starting snapshot, and
call `validateNoConflictingDataFiles`. That instructs Iceberg to check
whether there are data files that can conflict with the operation being
committed. The second process committing would refresh the table and then
need to validate that no conflicting data files were added since the
starting point in time (which is the same for both jobs). It would then
find the data file written by the first committer and detect that it may
contain records that were modified in the second commit. That will cause
Iceberg to throw an exception that the commit can't proceed and needs to be
fixed.

After that, you'd need to take some action to recover or retry. You could
restart from the last checkpoint, but that discards all of the work you've
already done. Another option is to keep track of the IDs that were upserted
and write deletes against those IDs for the new data files. That would
probably be better than retrying the entire checkpoint. From Iceberg's
perspective, recovery choices are left up to the engine. Flink might
recover as I described, but engines like Spark just retry the entire write
since that's simpler.

Ryan

On Wed, Mar 8, 2023 at 7:39 AM Tamas Sule <[email protected]>
wrote:

> Hello,
>
> We are experimenting with implementing position deletes in the Flink
> connector to provide better support between components in our stack as
> equality deletes are not widely supported yet. I identified an issue which
> (potentially) makes this impossible to do correctly in upsert mode and I'd
> like to get some feedback if my understanding is correct.
>
> So, let's imagine we create an Iceberg table using Flink with a primary
> key and we want to write to this table in upsert mode. If we replace
> equality deletes with position deletes I think there is a possibility that
> we end up with multiple records with the same primary key. Let's say we
> have a Flink job called 'A' which writes to this table in upsert mode and
> job 'B' which does a similar thing or just someone manually wants to upsert
> some data at the same time. If both jobs receive data with the same primary
> key very close to each other in time, then essentially what would happen is
> that when we scan the table for rows we want to position delete, neither of
> the jobs would see the change from the other job as the commits haven't
> happened yet. So if there is an older row, both jobs will try to delete
> that row which is fine, however they will also write the new data as well
> and now we have 2 rows with the same primary key. As far as I understand
> this would not happen with equality deletes because with that we just apply
> the equality fields as a filter to previous data files, so in that case we
> would have only 1 record with that primary key which is from the job that
> did  the commit later.
>
> Please let me know what you think.
>
> Thanks,
> Tamas
>


-- 
Ryan Blue
Tabular

Reply via email to