Hi Péter, Gabor,

Thanks a lot for clarifying and providing additional information, I had few
followup queries-
1.  We want to ingest an CDC stream using Flink to Iceberg sink, if we have
a RETRACT like CDC stream that uniquely distinguishes between INSERT /
UPDATE (for eg contains events as defined in
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/),
is there any mode in flink-iceberg ingestion today that does not create
delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ?

2. I was browsing through iceberg compaction and found that there is a
interface defined - ConvertEqualityDeleteFiles.java
<https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
 which might make reads more efficient and may be more cost effective than
a full compaction, but it looks like the interface
ConvertEqualityDeleteFiles.java
<https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
is
not implemented neither in spark or flink, do you know if there are plans
to implement this interface in spark or flink?

Regards,
Aditya


On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Aditya,
>
> The definition of UPSERT is that we have 2 types of messages:
> - DELETE - we need to remove the old record with the given id.
> - UPSERT - we need to remove the old version of the record based on the
> id, and should add a new version
>
> See:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion
>
> This type of stream uses fewer events when there are many updates, than
> the RETRACT stream. In a RETRACT stream we have 2 types of records:
> - DELETE - we need to remove the old record with the given id.
> - INSERT - insert a new record.
>
> So when we have an UPSERT stream, we can't be sure that there is no
> previous record for the given id, we either have to scan the whole table to
> find the previous version, or always create the equality delete to remove
> it.
>
> When the Flink sink was implemented, the decision was to avoid scanning,
> and write out the equality record every time. As Gabor already mentioned,
> it is recommended to compact the table to remove the equality deletes, to
> enhance the read performance. Currently we have Spark compaction ready, and
> we are working on the Flink compaction too. See:
> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
>
> I hope this helps,
> Peter
>
> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. 18.,
> Cs, 9:40):
>
>> Hey,
>> I had the chance to explore this area of eq-deletes recently myself too.
>> Apparently, this behavior is by design in Flink. The reason why it
>> unconditionally writes an eq-delete too for each insert (only in
>> upsert-mode, though) is to guarantee the uniqueness of the primary key. So
>> it drops the previous row with the same PK with a new eq-delete and then
>> adds the new row in a new data file. This happens unconditionally
>> unfortunately, so no reading is happening and even if there was no row
>> previously with the given PK, Flink will write an eq-delete for it anyway.
>> Yes, this can hurt read performance, so I guess the advised best practice
>> is to compact your table frequently.
>>
>> Gabor
>>
>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta
>> <adigu...@linkedin.com.invalid> wrote:
>>
>>> Hi all,
>>>
>>>
>>>
>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new
>>> record with a new equality field Id, then a equality delete file is also
>>> created with the corresponding entry, for example I executed following
>>> commands in Flink SQL with Apache Iceberg-
>>>
>>>
>>>
>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` (
>>>
>>>      `id`  INT UNIQUE COMMENT 'unique id',
>>>
>>>      `data` STRING NOT NULL,
>>>
>>>      PRIMARY KEY(`id`) NOT ENFORCED
>>>
>>>  ) with ('format-version'='2', 'write.upsert.enabled'='true');
>>>
>>>
>>>
>>> now I inserted a record-
>>>
>>>
>>>
>>> INSERT INTO upsert_test1 VALUES (7, 'new value');
>>>
>>>
>>>
>>> It resulted in 2 files -
>>>
>>> data file content-
>>>
>>>
>>>
>>> {"id":7,"data":"new value"}
>>>
>>>
>>>
>>> But it also created an equality delete file -
>>>
>>>
>>>
>>> {"id":7}
>>>
>>>
>>>
>>> I expect that it will create a delete file entry for UPDATE / DELETE but
>>> not for INSERT as it might lead to performance degradation for reads for
>>> CDC tables, right?
>>>
>>> is it expected that fresh INSERTS will also have equality delete entries
>>> ? If yes, what is the benefit of having equality delete entry for INSERTS ?
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>> Aditya
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>

Reply via email to