Hi Péter,

Thanks for the detailed answers, we have some CDC streams that have very
high change rate, for such tables we were thinking to leverage
ConvertEqualityDeleteFiles
<https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
 to convert eq deletes to pos deletes to improve the MoR performance and
run it in at a high frequency (like let's say every hour).
Is the suggestion to instead run full compaction at higher frequency to
improve read performance?
We think Full Compaction / rewriting may cause a lot of write amplification
if run at higher frequency for tables having high change, we wanted to
check if any analysis performed on tradeoffs for having
ConvertEqualityDeleteFiles
<https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
procedure
run at higher frequency vs having run compaction? or are there any other
suggestions?

Regards,
Aditya


On Mon, Apr 22, 2024 at 2:35 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Aditya,
>
> See my answers below:
>
> Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont:
> 2024. ápr. 20., Szo, 11:05):
>
>> Hi Péter, Gabor,
>>
>> Thanks a lot for clarifying and providing additional information, I had
>> few followup queries-
>> 1.  We want to ingest an CDC stream using Flink to Iceberg sink, if we
>> have a RETRACT like CDC stream that uniquely distinguishes between INSERT /
>> UPDATE (for eg contains events as defined in
>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/),
>> is there any mode in flink-iceberg ingestion today that does not create
>> delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ?
>>
>
> You can use `upsert(false)`, or just leave it out, as the default is false.
> This will create an equality delete for every `-D`, `-U` events, and
> positional delete if there is an insert for the same key in the same
> checkpoint.
>
>
>> 2. I was browsing through iceberg compaction and found that there is a
>> interface defined - ConvertEqualityDeleteFiles.java
>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>  which might make reads more efficient and may be more cost effective than
>> a full compaction, but it looks like the interface
>> ConvertEqualityDeleteFiles.java
>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java>
>>  is
>> not implemented neither in spark or flink, do you know if there are plans
>> to implement this interface in spark or flink?
>>
>
> I remember the discussion around it, and this conversion is questionable.
> For converting the equality delete to a positional delete, we need to scan
> the whole table. This is half of the IO, and maybe rewriting the whole
> table is better in this case. Especially, that after the conversion we
> still have to apply the positional deletes during the read.
>
>
>> Regards,
>> Aditya
>>
>>
>> On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Hi Aditya,
>>>
>>> The definition of UPSERT is that we have 2 types of messages:
>>> - DELETE - we need to remove the old record with the given id.
>>> - UPSERT - we need to remove the old version of the record based on the
>>> id, and should add a new version
>>>
>>> See:
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion
>>>
>>> This type of stream uses fewer events when there are many updates, than
>>> the RETRACT stream. In a RETRACT stream we have 2 types of records:
>>> - DELETE - we need to remove the old record with the given id.
>>> - INSERT - insert a new record.
>>>
>>> So when we have an UPSERT stream, we can't be sure that there is no
>>> previous record for the given id, we either have to scan the whole table to
>>> find the previous version, or always create the equality delete to remove
>>> it.
>>>
>>> When the Flink sink was implemented, the decision was to avoid scanning,
>>> and write out the equality record every time. As Gabor already mentioned,
>>> it is recommended to compact the table to remove the equality deletes, to
>>> enhance the read performance. Currently we have Spark compaction ready, and
>>> we are working on the Flink compaction too. See:
>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
>>>
>>> I hope this helps,
>>> Peter
>>>
>>> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr.
>>> 18., Cs, 9:40):
>>>
>>>> Hey,
>>>> I had the chance to explore this area of eq-deletes recently myself
>>>> too. Apparently, this behavior is by design in Flink. The reason why it
>>>> unconditionally writes an eq-delete too for each insert (only in
>>>> upsert-mode, though) is to guarantee the uniqueness of the primary key. So
>>>> it drops the previous row with the same PK with a new eq-delete and then
>>>> adds the new row in a new data file. This happens unconditionally
>>>> unfortunately, so no reading is happening and even if there was no row
>>>> previously with the given PK, Flink will write an eq-delete for it anyway.
>>>> Yes, this can hurt read performance, so I guess the advised best
>>>> practice is to compact your table frequently.
>>>>
>>>> Gabor
>>>>
>>>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta
>>>> <adigu...@linkedin.com.invalid> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>>
>>>>>
>>>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new
>>>>> record with a new equality field Id, then a equality delete file is also
>>>>> created with the corresponding entry, for example I executed following
>>>>> commands in Flink SQL with Apache Iceberg-
>>>>>
>>>>>
>>>>>
>>>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` (
>>>>>
>>>>>      `id`  INT UNIQUE COMMENT 'unique id',
>>>>>
>>>>>      `data` STRING NOT NULL,
>>>>>
>>>>>      PRIMARY KEY(`id`) NOT ENFORCED
>>>>>
>>>>>  ) with ('format-version'='2', 'write.upsert.enabled'='true');
>>>>>
>>>>>
>>>>>
>>>>> now I inserted a record-
>>>>>
>>>>>
>>>>>
>>>>> INSERT INTO upsert_test1 VALUES (7, 'new value');
>>>>>
>>>>>
>>>>>
>>>>> It resulted in 2 files -
>>>>>
>>>>> data file content-
>>>>>
>>>>>
>>>>>
>>>>> {"id":7,"data":"new value"}
>>>>>
>>>>>
>>>>>
>>>>> But it also created an equality delete file -
>>>>>
>>>>>
>>>>>
>>>>> {"id":7}
>>>>>
>>>>>
>>>>>
>>>>> I expect that it will create a delete file entry for UPDATE / DELETE
>>>>> but not for INSERT as it might lead to performance degradation for reads
>>>>> for CDC tables, right?
>>>>>
>>>>> is it expected that fresh INSERTS will also have equality delete
>>>>> entries ? If yes, what is the benefit of having equality delete entry for
>>>>> INSERTS ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Aditya
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>

Reply via email to