Hi Péter, Thanks for the detailed answers, we have some CDC streams that have very high change rate, for such tables we were thinking to leverage ConvertEqualityDeleteFiles <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> to convert eq deletes to pos deletes to improve the MoR performance and run it in at a high frequency (like let's say every hour). Is the suggestion to instead run full compaction at higher frequency to improve read performance? We think Full Compaction / rewriting may cause a lot of write amplification if run at higher frequency for tables having high change, we wanted to check if any analysis performed on tradeoffs for having ConvertEqualityDeleteFiles <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> procedure run at higher frequency vs having run compaction? or are there any other suggestions?
Regards, Aditya On Mon, Apr 22, 2024 at 2:35 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Aditya, > > See my answers below: > > Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: > 2024. ápr. 20., Szo, 11:05): > >> Hi Péter, Gabor, >> >> Thanks a lot for clarifying and providing additional information, I had >> few followup queries- >> 1. We want to ingest an CDC stream using Flink to Iceberg sink, if we >> have a RETRACT like CDC stream that uniquely distinguishes between INSERT / >> UPDATE (for eg contains events as defined in >> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/), >> is there any mode in flink-iceberg ingestion today that does not create >> delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ? >> > > You can use `upsert(false)`, or just leave it out, as the default is false. > This will create an equality delete for every `-D`, `-U` events, and > positional delete if there is an insert for the same key in the same > checkpoint. > > >> 2. I was browsing through iceberg compaction and found that there is a >> interface defined - ConvertEqualityDeleteFiles.java >> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >> which might make reads more efficient and may be more cost effective than >> a full compaction, but it looks like the interface >> ConvertEqualityDeleteFiles.java >> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >> is >> not implemented neither in spark or flink, do you know if there are plans >> to implement this interface in spark or flink? >> > > I remember the discussion around it, and this conversion is questionable. > For converting the equality delete to a positional delete, we need to scan > the whole table. This is half of the IO, and maybe rewriting the whole > table is better in this case. Especially, that after the conversion we > still have to apply the positional deletes during the read. > > >> Regards, >> Aditya >> >> >> On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Hi Aditya, >>> >>> The definition of UPSERT is that we have 2 types of messages: >>> - DELETE - we need to remove the old record with the given id. >>> - UPSERT - we need to remove the old version of the record based on the >>> id, and should add a new version >>> >>> See: >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion >>> >>> This type of stream uses fewer events when there are many updates, than >>> the RETRACT stream. In a RETRACT stream we have 2 types of records: >>> - DELETE - we need to remove the old record with the given id. >>> - INSERT - insert a new record. >>> >>> So when we have an UPSERT stream, we can't be sure that there is no >>> previous record for the given id, we either have to scan the whole table to >>> find the previous version, or always create the equality delete to remove >>> it. >>> >>> When the Flink sink was implemented, the decision was to avoid scanning, >>> and write out the equality record every time. As Gabor already mentioned, >>> it is recommended to compact the table to remove the equality deletes, to >>> enhance the read performance. Currently we have Spark compaction ready, and >>> we are working on the Flink compaction too. See: >>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>> >>> I hope this helps, >>> Peter >>> >>> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. >>> 18., Cs, 9:40): >>> >>>> Hey, >>>> I had the chance to explore this area of eq-deletes recently myself >>>> too. Apparently, this behavior is by design in Flink. The reason why it >>>> unconditionally writes an eq-delete too for each insert (only in >>>> upsert-mode, though) is to guarantee the uniqueness of the primary key. So >>>> it drops the previous row with the same PK with a new eq-delete and then >>>> adds the new row in a new data file. This happens unconditionally >>>> unfortunately, so no reading is happening and even if there was no row >>>> previously with the given PK, Flink will write an eq-delete for it anyway. >>>> Yes, this can hurt read performance, so I guess the advised best >>>> practice is to compact your table frequently. >>>> >>>> Gabor >>>> >>>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta >>>> <adigu...@linkedin.com.invalid> wrote: >>>> >>>>> Hi all, >>>>> >>>>> >>>>> >>>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >>>>> record with a new equality field Id, then a equality delete file is also >>>>> created with the corresponding entry, for example I executed following >>>>> commands in Flink SQL with Apache Iceberg- >>>>> >>>>> >>>>> >>>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >>>>> >>>>> `id` INT UNIQUE COMMENT 'unique id', >>>>> >>>>> `data` STRING NOT NULL, >>>>> >>>>> PRIMARY KEY(`id`) NOT ENFORCED >>>>> >>>>> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >>>>> >>>>> >>>>> >>>>> now I inserted a record- >>>>> >>>>> >>>>> >>>>> INSERT INTO upsert_test1 VALUES (7, 'new value'); >>>>> >>>>> >>>>> >>>>> It resulted in 2 files - >>>>> >>>>> data file content- >>>>> >>>>> >>>>> >>>>> {"id":7,"data":"new value"} >>>>> >>>>> >>>>> >>>>> But it also created an equality delete file - >>>>> >>>>> >>>>> >>>>> {"id":7} >>>>> >>>>> >>>>> >>>>> I expect that it will create a delete file entry for UPDATE / DELETE >>>>> but not for INSERT as it might lead to performance degradation for reads >>>>> for CDC tables, right? >>>>> >>>>> is it expected that fresh INSERTS will also have equality delete >>>>> entries ? If yes, what is the benefit of having equality delete entry for >>>>> INSERTS ? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Aditya >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>