Hi Péter, Gabor, Thanks a lot for clarifying and providing additional information, I had few followup queries- 1. We want to ingest an CDC stream using Flink to Iceberg sink, if we have a RETRACT like CDC stream that uniquely distinguishes between INSERT / UPDATE (for eg contains events as defined in https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/), is there any mode in flink-iceberg ingestion today that does not create delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ?
2. I was browsing through iceberg compaction and found that there is a interface defined - ConvertEqualityDeleteFiles.java <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> which might make reads more efficient and may be more cost effective than a full compaction, but it looks like the interface ConvertEqualityDeleteFiles.java <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> is not implemented neither in spark or flink, do you know if there are plans to implement this interface in spark or flink? Regards, Aditya On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Aditya, > > The definition of UPSERT is that we have 2 types of messages: > - DELETE - we need to remove the old record with the given id. > - UPSERT - we need to remove the old version of the record based on the > id, and should add a new version > > See: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion > > This type of stream uses fewer events when there are many updates, than > the RETRACT stream. In a RETRACT stream we have 2 types of records: > - DELETE - we need to remove the old record with the given id. > - INSERT - insert a new record. > > So when we have an UPSERT stream, we can't be sure that there is no > previous record for the given id, we either have to scan the whole table to > find the previous version, or always create the equality delete to remove > it. > > When the Flink sink was implemented, the decision was to avoid scanning, > and write out the equality record every time. As Gabor already mentioned, > it is recommended to compact the table to remove the equality deletes, to > enhance the read performance. Currently we have Spark compaction ready, and > we are working on the Flink compaction too. See: > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl > > I hope this helps, > Peter > > Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. 18., > Cs, 9:40): > >> Hey, >> I had the chance to explore this area of eq-deletes recently myself too. >> Apparently, this behavior is by design in Flink. The reason why it >> unconditionally writes an eq-delete too for each insert (only in >> upsert-mode, though) is to guarantee the uniqueness of the primary key. So >> it drops the previous row with the same PK with a new eq-delete and then >> adds the new row in a new data file. This happens unconditionally >> unfortunately, so no reading is happening and even if there was no row >> previously with the given PK, Flink will write an eq-delete for it anyway. >> Yes, this can hurt read performance, so I guess the advised best practice >> is to compact your table frequently. >> >> Gabor >> >> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta >> <adigu...@linkedin.com.invalid> wrote: >> >>> Hi all, >>> >>> >>> >>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >>> record with a new equality field Id, then a equality delete file is also >>> created with the corresponding entry, for example I executed following >>> commands in Flink SQL with Apache Iceberg- >>> >>> >>> >>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >>> >>> `id` INT UNIQUE COMMENT 'unique id', >>> >>> `data` STRING NOT NULL, >>> >>> PRIMARY KEY(`id`) NOT ENFORCED >>> >>> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >>> >>> >>> >>> now I inserted a record- >>> >>> >>> >>> INSERT INTO upsert_test1 VALUES (7, 'new value'); >>> >>> >>> >>> It resulted in 2 files - >>> >>> data file content- >>> >>> >>> >>> {"id":7,"data":"new value"} >>> >>> >>> >>> But it also created an equality delete file - >>> >>> >>> >>> {"id":7} >>> >>> >>> >>> I expect that it will create a delete file entry for UPDATE / DELETE but >>> not for INSERT as it might lead to performance degradation for reads for >>> CDC tables, right? >>> >>> is it expected that fresh INSERTS will also have equality delete entries >>> ? If yes, what is the benefit of having equality delete entry for INSERTS ? >>> >>> >>> >>> >>> >>> Regards, >>> >>> Aditya >>> >>> >>> >>> >>> >>> >>> >>