Hi Péter, Thanks a lot for explaining and sharing the reference thread, I will follow it.
Regards, Aditya On Mon, Apr 22, 2024 at 8:39 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Very high change rate means that it is most probably worth it to rewrite > the data files from time to time. > The high change rate itself causes write amplification, as every new > version of the record is essentially a new row to the table (and a > tombstone for the old data). The ConvertEqualityDeleteFiles will not change > this. It will change only that instead of storing a tombstone of "delete > every row where the id = 10" , we store a tombstone of "delete the n-th row > from the x file". Reading the positional delete files are a bit more > effective, but if we rewrite the table data then we get rid of the old > data, and the tombstone. Converting the equality deletes just come up in > the discussion about the table maintenance thread: > https://lists.apache.org/thread/37k03vfmrr81o10d9zgdhpyc28mrrp9z > > Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: > 2024. ápr. 22., H, 15:28): > >> Hi Péter, >> >> Thanks for the detailed answers, we have some CDC streams that have very >> high change rate, for such tables we were thinking to leverage >> ConvertEqualityDeleteFiles >> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >> to convert eq deletes to pos deletes to improve the MoR performance and >> run it in at a high frequency (like let's say every hour). >> Is the suggestion to instead run full compaction at higher frequency to >> improve read performance? >> We think Full Compaction / rewriting may cause a lot of write >> amplification if run at higher frequency for tables having high change, we >> wanted to check if any analysis performed on tradeoffs for having >> ConvertEqualityDeleteFiles >> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >> procedure >> run at higher frequency vs having run compaction? or are there any other >> suggestions? >> >> Regards, >> Aditya >> >> >> On Mon, Apr 22, 2024 at 2:35 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Hi Aditya, >>> >>> See my answers below: >>> >>> Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: >>> 2024. ápr. 20., Szo, 11:05): >>> >>>> Hi Péter, Gabor, >>>> >>>> Thanks a lot for clarifying and providing additional information, I had >>>> few followup queries- >>>> 1. We want to ingest an CDC stream using Flink to Iceberg sink, if we >>>> have a RETRACT like CDC stream that uniquely distinguishes between INSERT / >>>> UPDATE (for eg contains events as defined in >>>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/), >>>> is there any mode in flink-iceberg ingestion today that does not create >>>> delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ? >>>> >>> >>> You can use `upsert(false)`, or just leave it out, as the default is >>> false. >>> This will create an equality delete for every `-D`, `-U` events, and >>> positional delete if there is an insert for the same key in the same >>> checkpoint. >>> >>> >>>> 2. I was browsing through iceberg compaction and found that there is a >>>> interface defined - ConvertEqualityDeleteFiles.java >>>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >>>> which might make reads more efficient and may be more cost effective than >>>> a full compaction, but it looks like the interface >>>> ConvertEqualityDeleteFiles.java >>>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >>>> is >>>> not implemented neither in spark or flink, do you know if there are plans >>>> to implement this interface in spark or flink? >>>> >>> >>> I remember the discussion around it, and this conversion is >>> questionable. For converting the equality delete to a positional delete, we >>> need to scan the whole table. This is half of the IO, and maybe rewriting >>> the whole table is better in this case. Especially, that after the >>> conversion we still have to apply the positional deletes during the read. >>> >>> >>>> Regards, >>>> Aditya >>>> >>>> >>>> On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> Hi Aditya, >>>>> >>>>> The definition of UPSERT is that we have 2 types of messages: >>>>> - DELETE - we need to remove the old record with the given id. >>>>> - UPSERT - we need to remove the old version of the record based on >>>>> the id, and should add a new version >>>>> >>>>> See: >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion >>>>> >>>>> This type of stream uses fewer events when there are many updates, >>>>> than the RETRACT stream. In a RETRACT stream we have 2 types of records: >>>>> - DELETE - we need to remove the old record with the given id. >>>>> - INSERT - insert a new record. >>>>> >>>>> So when we have an UPSERT stream, we can't be sure that there is no >>>>> previous record for the given id, we either have to scan the whole table >>>>> to >>>>> find the previous version, or always create the equality delete to remove >>>>> it. >>>>> >>>>> When the Flink sink was implemented, the decision was to avoid >>>>> scanning, and write out the equality record every time. As Gabor already >>>>> mentioned, it is recommended to compact the table to remove the equality >>>>> deletes, to enhance the read performance. Currently we have Spark >>>>> compaction ready, and we are working on the Flink compaction too. See: >>>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>>>> >>>>> I hope this helps, >>>>> Peter >>>>> >>>>> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. >>>>> 18., Cs, 9:40): >>>>> >>>>>> Hey, >>>>>> I had the chance to explore this area of eq-deletes recently myself >>>>>> too. Apparently, this behavior is by design in Flink. The reason why it >>>>>> unconditionally writes an eq-delete too for each insert (only in >>>>>> upsert-mode, though) is to guarantee the uniqueness of the primary key. >>>>>> So >>>>>> it drops the previous row with the same PK with a new eq-delete and then >>>>>> adds the new row in a new data file. This happens unconditionally >>>>>> unfortunately, so no reading is happening and even if there was no row >>>>>> previously with the given PK, Flink will write an eq-delete for it >>>>>> anyway. >>>>>> Yes, this can hurt read performance, so I guess the advised best >>>>>> practice is to compact your table frequently. >>>>>> >>>>>> Gabor >>>>>> >>>>>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta >>>>>> <adigu...@linkedin.com.invalid> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> >>>>>>> >>>>>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >>>>>>> record with a new equality field Id, then a equality delete file is also >>>>>>> created with the corresponding entry, for example I executed following >>>>>>> commands in Flink SQL with Apache Iceberg- >>>>>>> >>>>>>> >>>>>>> >>>>>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >>>>>>> >>>>>>> `id` INT UNIQUE COMMENT 'unique id', >>>>>>> >>>>>>> `data` STRING NOT NULL, >>>>>>> >>>>>>> PRIMARY KEY(`id`) NOT ENFORCED >>>>>>> >>>>>>> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >>>>>>> >>>>>>> >>>>>>> >>>>>>> now I inserted a record- >>>>>>> >>>>>>> >>>>>>> >>>>>>> INSERT INTO upsert_test1 VALUES (7, 'new value'); >>>>>>> >>>>>>> >>>>>>> >>>>>>> It resulted in 2 files - >>>>>>> >>>>>>> data file content- >>>>>>> >>>>>>> >>>>>>> >>>>>>> {"id":7,"data":"new value"} >>>>>>> >>>>>>> >>>>>>> >>>>>>> But it also created an equality delete file - >>>>>>> >>>>>>> >>>>>>> >>>>>>> {"id":7} >>>>>>> >>>>>>> >>>>>>> >>>>>>> I expect that it will create a delete file entry for UPDATE / DELETE >>>>>>> but not for INSERT as it might lead to performance degradation for reads >>>>>>> for CDC tables, right? >>>>>>> >>>>>>> is it expected that fresh INSERTS will also have equality delete >>>>>>> entries ? If yes, what is the benefit of having equality delete entry >>>>>>> for >>>>>>> INSERTS ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Aditya >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>