Hi Péter,

Thanks a lot for explaining and sharing the reference thread, I will follow
it.

Regards,
Aditya


On Mon, Apr 22, 2024 at 8:39 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Very high change rate means that it is most probably worth it to rewrite
> the data files from time to time.
> The high change rate itself causes write amplification, as every new
> version of the record is essentially a new row to the table (and a
> tombstone for the old data). The ConvertEqualityDeleteFiles will not change
> this. It will change only that instead of storing a tombstone of "delete
> every row where the id = 10" , we store a tombstone of "delete the n-th row
> from the x file". Reading the positional delete files are a bit more
> effective, but if we rewrite the table data then we get rid of the old
> data, and the tombstone. Converting the equality deletes just come up in
> the discussion about the table maintenance thread:
> https://lists.apache.org/thread/37k03vfmrr81o10d9zgdhpyc28mrrp9z
>
> Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont:
> 2024. ápr. 22., H, 15:28):
>
>> Hi Péter,
>>
>> Thanks for the detailed answers, we have some CDC streams that have very
>> high change rate, for such tables we were thinking to leverage
>> ConvertEqualityDeleteFiles
>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>  to convert eq deletes to pos deletes to improve the MoR performance and
>> run it in at a high frequency (like let's say every hour).
>> Is the suggestion to instead run full compaction at higher frequency to
>> improve read performance?
>> We think Full Compaction / rewriting may cause a lot of write
>> amplification if run at higher frequency for tables having high change, we
>> wanted to check if any analysis performed on tradeoffs for having
>> ConvertEqualityDeleteFiles
>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>  procedure
>> run at higher frequency vs having run compaction? or are there any other
>> suggestions?
>>
>> Regards,
>> Aditya
>>
>>
>> On Mon, Apr 22, 2024 at 2:35 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Hi Aditya,
>>>
>>> See my answers below:
>>>
>>> Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont:
>>> 2024. ápr. 20., Szo, 11:05):
>>>
>>>> Hi Péter, Gabor,
>>>>
>>>> Thanks a lot for clarifying and providing additional information, I had
>>>> few followup queries-
>>>> 1.  We want to ingest an CDC stream using Flink to Iceberg sink, if we
>>>> have a RETRACT like CDC stream that uniquely distinguishes between INSERT /
>>>> UPDATE (for eg contains events as defined in
>>>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/),
>>>> is there any mode in flink-iceberg ingestion today that does not create
>>>> delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ?
>>>>
>>>
>>> You can use `upsert(false)`, or just leave it out, as the default is
>>> false.
>>> This will create an equality delete for every `-D`, `-U` events, and
>>> positional delete if there is an insert for the same key in the same
>>> checkpoint.
>>>
>>>
>>>> 2. I was browsing through iceberg compaction and found that there is a
>>>> interface defined - ConvertEqualityDeleteFiles.java
>>>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>>>  which might make reads more efficient and may be more cost effective than
>>>> a full compaction, but it looks like the interface
>>>> ConvertEqualityDeleteFiles.java
>>>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>>>  is
>>>> not implemented neither in spark or flink, do you know if there are plans
>>>> to implement this interface in spark or flink?
>>>>
>>>
>>> I remember the discussion around it, and this conversion is
>>> questionable. For converting the equality delete to a positional delete, we
>>> need to scan the whole table. This is half of the IO, and maybe rewriting
>>> the whole table is better in this case. Especially, that after the
>>> conversion we still have to apply the positional deletes during the read.
>>>
>>>
>>>> Regards,
>>>> Aditya
>>>>
>>>>
>>>> On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Aditya,
>>>>>
>>>>> The definition of UPSERT is that we have 2 types of messages:
>>>>> - DELETE - we need to remove the old record with the given id.
>>>>> - UPSERT - we need to remove the old version of the record based on
>>>>> the id, and should add a new version
>>>>>
>>>>> See:
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion
>>>>>
>>>>> This type of stream uses fewer events when there are many updates,
>>>>> than the RETRACT stream. In a RETRACT stream we have 2 types of records:
>>>>> - DELETE - we need to remove the old record with the given id.
>>>>> - INSERT - insert a new record.
>>>>>
>>>>> So when we have an UPSERT stream, we can't be sure that there is no
>>>>> previous record for the given id, we either have to scan the whole table 
>>>>> to
>>>>> find the previous version, or always create the equality delete to remove
>>>>> it.
>>>>>
>>>>> When the Flink sink was implemented, the decision was to avoid
>>>>> scanning, and write out the equality record every time. As Gabor already
>>>>> mentioned, it is recommended to compact the table to remove the equality
>>>>> deletes, to enhance the read performance. Currently we have Spark
>>>>> compaction ready, and we are working on the Flink compaction too. See:
>>>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
>>>>>
>>>>> I hope this helps,
>>>>> Peter
>>>>>
>>>>> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr.
>>>>> 18., Cs, 9:40):
>>>>>
>>>>>> Hey,
>>>>>> I had the chance to explore this area of eq-deletes recently myself
>>>>>> too. Apparently, this behavior is by design in Flink. The reason why it
>>>>>> unconditionally writes an eq-delete too for each insert (only in
>>>>>> upsert-mode, though) is to guarantee the uniqueness of the primary key. 
>>>>>> So
>>>>>> it drops the previous row with the same PK with a new eq-delete and then
>>>>>> adds the new row in a new data file. This happens unconditionally
>>>>>> unfortunately, so no reading is happening and even if there was no row
>>>>>> previously with the given PK, Flink will write an eq-delete for it 
>>>>>> anyway.
>>>>>> Yes, this can hurt read performance, so I guess the advised best
>>>>>> practice is to compact your table frequently.
>>>>>>
>>>>>> Gabor
>>>>>>
>>>>>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta
>>>>>> <adigu...@linkedin.com.invalid> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new
>>>>>>> record with a new equality field Id, then a equality delete file is also
>>>>>>> created with the corresponding entry, for example I executed following
>>>>>>> commands in Flink SQL with Apache Iceberg-
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` (
>>>>>>>
>>>>>>>      `id`  INT UNIQUE COMMENT 'unique id',
>>>>>>>
>>>>>>>      `data` STRING NOT NULL,
>>>>>>>
>>>>>>>      PRIMARY KEY(`id`) NOT ENFORCED
>>>>>>>
>>>>>>>  ) with ('format-version'='2', 'write.upsert.enabled'='true');
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> now I inserted a record-
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> INSERT INTO upsert_test1 VALUES (7, 'new value');
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> It resulted in 2 files -
>>>>>>>
>>>>>>> data file content-
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> {"id":7,"data":"new value"}
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> But it also created an equality delete file -
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> {"id":7}
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I expect that it will create a delete file entry for UPDATE / DELETE
>>>>>>> but not for INSERT as it might lead to performance degradation for reads
>>>>>>> for CDC tables, right?
>>>>>>>
>>>>>>> is it expected that fresh INSERTS will also have equality delete
>>>>>>> entries ? If yes, what is the benefit of having equality delete entry 
>>>>>>> for
>>>>>>> INSERTS ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Aditya
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>

Reply via email to