Hi Yufei

There was a proposed PR for this :

On Thu, Apr 21, 2022 at 5:42 AM Yufei Gu <flyrain...@gmail.com> wrote:

> Hi team,
> Do we have a PR for this type of delete compaction?
>> Merge: the changes specified in delete files are applied to data files
>> and then overwrite the original data file, e.g. merging delete files to
>> data files.
> Yufei
> On Wed, Nov 3, 2021 at 8:29 AM Puneet Zaroo <pza...@netflix.com.invalid>
> wrote:
>> Sounds great. I will look at the PRs.
>> thanks,
>> On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <yezhao...@gmail.com> wrote:
>>> Yes I am actually arriving at exactly the same conclusion as you just
>>> now. I was focusing on the immediate removal of delete files too much when
>>> writing the doc and lost this aspect that we don't need to remove the
>>> deletes after having the functionality to preserve sequence number.
>>> I just published https://github.com/apache/iceberg/pull/3454 to add the
>>> option for selecting based on deletes in BinPackStrategy this afternoon, I
>>> will add another PR that preserves the sequence number tomorrow.
>>> -Jack
>>> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pza...@netflix.com.invalid>
>>> wrote:
>>>> Thanks for further clarifications, and outlining the detailed steps for
>>>> the delete or 'MERGE' compaction. It seems this compaction is explicitly
>>>> geared towards removing delete files. While that may be useful; I feel for
>>>> CDC tables doing the Bin-pack and Sorting compactions and *removing
>>>> the NEED for reading delete files in downstream queries * quickly
>>>> without conflicts with concurrent CDC updates is more important. This
>>>> guarantees that downstream query performance is decent soon after the data
>>>> has landed in the table.
>>>> The actual delete file removal can happen in a somewhat delayed manner
>>>> as well; as that is now just a storage optimization as those delete files
>>>> are no longer accessed in the query path.
>>>> The above requirements can be achieved if the output of the current
>>>> sorting and bin-pack actions also set the sequence number to the sequence
>>>> number of the snapshot with which the compaction was started. And of-course
>>>> the file selection criteria has to be extended to also pick files which
>>>> have > a threshold number of delete files associated with them, in addition
>>>> to the criteria already used (incorrect file size for bin pack or range
>>>> overlap for sort).
>>>> Thanks,
>>>> - Puneet
>>>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>> > I think even with the custom sequence file numbers on output data
>>>>> files; the position delete files have to be deleted; *since position
>>>>> deletes also apply on data files with the same sequence number*.
>>>>> Also, unless I am missing something, I think the equality delete files
>>>>> cannot be deleted at the end of this compaction, as it is really hard to
>>>>> figure out if all the impacted data files have been rewritten:
>>>>> The plan is to always remove both position and equality deletes. Given
>>>>> a predicate (e.g. COMPACT table WHERE region='US'), the initial
>>>>> implementation will always compact full partitions by (1) look for all
>>>>> delete files based on the predicate, (2) get all impacted partitions, (3)
>>>>> rewrite all data files in those partitions that has deletes, (4) remove
>>>>> those delete files. The algorithm can be improved to a smaller subset of
>>>>> files, but currently we mostly rely on Iceberg's scan planning and as you
>>>>> said it's hard to figure out if a delete file (especially equality delete)
>>>>> covers any additional data files. But we know each delete file only 
>>>>> belongs
>>>>> to a single partition, which guarantees the removal is safe. (For
>>>>> partitioned tables, global deletes will be handled separately and not
>>>>> removed unless specifically requested by the user because it requires a
>>>>> full table compact, but CDC does not write global deletes anyway)
>>>>> -Jack
>>>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pza...@netflix.com.invalid>
>>>>> wrote:
>>>>>> Thanks for the clarifications; and thanks for pulling together the
>>>>>> documentation for the row-level delete functionality separately; as that
>>>>>> will be very helpful.
>>>>>> I think we are in agreement on most points. I just want to reiterate
>>>>>> my understanding of the merge compaction behavior to make sure we are on
>>>>>> the same page.
>>>>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>>>>> small files with unsorted data; so doing the bin-pack and sorting
>>>>>> compactions is also important for those tables (and obviously being able 
>>>>>> to
>>>>>> do so without conflicts with incoming data is also important). If the
>>>>>> existing bin-pack and sort compaction actions are enhanced with
>>>>>> 1) writing the output data files with the sequence number of the
>>>>>> snapshot with which the compaction was started with *AND *
>>>>>> 2) deleting all position delete files whose data files have been
>>>>>> rewritten;
>>>>>> then I believe we can run those (bin-pack and sort) compactions as
>>>>>> well without fear of conflicting with newer CDC updates. I think even 
>>>>>> with
>>>>>> the custom sequence file numbers on output data files; the position 
>>>>>> delete
>>>>>> files have to be deleted; *since position deletes also apply on data
>>>>>> files with the same sequence number*. Also, unless I am missing
>>>>>> something, I think the equality delete files cannot be deleted at the end
>>>>>> of this compaction, as it is really hard to figure out if all the 
>>>>>> impacted
>>>>>> data files have been rewritten.
>>>>>> If the bin-pack and sort compactions are enhanced in this manner;
>>>>>> then I foresee just running those so that the same compaction can take 
>>>>>> care
>>>>>> of all relevant optimizations for a table (including delete file 
>>>>>> removal).
>>>>>> At the end, potentially, only some unnecessary equality delete files will
>>>>>> be remaining which will have to be deleted by some other action.
>>>>>> Thanks,
>>>>>> - Puneet
>>>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>>>> > why can't this strategy do bin-packing or sorting as well; if that
>>>>>>> is required; as long as the sequence number is not updated.
>>>>>>> > wouldn't subsequent reads re-apply the delete files which were
>>>>>>> used in the merge as well?
>>>>>>> I think you are right, we can use the sequence number of the
>>>>>>> snapshot we start compaction as the new sequence number of all rewritten
>>>>>>> files because all the deletes of older sequence numbers related to the 
>>>>>>> file
>>>>>>> have been applied. (correct me if I missed anything here)
>>>>>>> But from the correctness perspective it's the same because we remove
>>>>>>> those delete files as a part of the compaction so we will not really
>>>>>>> reapply those deletes.
>>>>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>>>>> sharing the same sequence number, what I described was just the easiest 
>>>>>>> way
>>>>>>> to ensure sequence number preservation)
>>>>>>> > for the tables only being written into by a Flink CDC pipeline,
>>>>>>> this should not happen as position deletes are only created for 
>>>>>>> in-progress
>>>>>>> (uncommitted) data files, correct ?
>>>>>>> Correct
>>>>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>>>>> partitions will turn cold and can be merged without conflicts, as 
>>>>>>> 'hotness'
>>>>>>> is a factor of mutation rate in the source DB
>>>>>>> I agree. When I say hot/cold I am mostly referring to those
>>>>>>> time-partitioned use cases with clear hot-cold division of data. But in 
>>>>>>> the
>>>>>>> end the principle is that the compaction strategy should be determined 
>>>>>>> by
>>>>>>> the characteristics of the partition. If all the partitions are always 
>>>>>>> with
>>>>>>> high traffic, then I think merge with preserved sequence number seems 
>>>>>>> like
>>>>>>> the way to go for the entire table.
>>>>>>> I have recently summarized all the related concepts in
>>>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if
>>>>>>> you can take a look about anything else I missed, thanks!
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo
>>>>>>> <pza...@netflix.com.invalid> wrote:
>>>>>>>> Another follow-up regarding this : *"Merge strategy that does not
>>>>>>>> do any bin-packing, and only merges the delete files for each data 
>>>>>>>> file and
>>>>>>>> writes it back. The new data file will have the same sequence number 
>>>>>>>> as the
>>>>>>>> old file before Merge"* ; shouldn't the sequence number be set to
>>>>>>>> the highest sequence number of any applied delete file to the data 
>>>>>>>> file. If
>>>>>>>> the sequence number of the data file is not changed at-all, wouldn't
>>>>>>>> subsequent reads re-apply the delete files which were used in the 
>>>>>>>> merge as
>>>>>>>> well?
>>>>>>>> thanks
>>>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pza...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>> I had a few follow-up points.
>>>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>> partition becomes cold and a Merge can be performed safely.".* :
>>>>>>>>> I believe for the CDC use case it is hard to guarantee that  that
>>>>>>>>> partitions will turn cold and can be merged without conflicts, as 
>>>>>>>>> 'hotness'
>>>>>>>>> is a factor of mutation rate in the source DB; and perhaps some 
>>>>>>>>> partitions
>>>>>>>>> are always "hot"; so in essence the following:  *"Merge strategy
>>>>>>>>> that does not do any bin-packing, and only merges the delete files 
>>>>>>>>> for each
>>>>>>>>> data file and writes it back. The new data file will have the same 
>>>>>>>>> sequence
>>>>>>>>> number as the old file before Merge"* seems important. Though as
>>>>>>>>> a follow-up I am wondering why can't this strategy do bin-packing or
>>>>>>>>> sorting as well; if that is required; as long as the sequence number 
>>>>>>>>> is not
>>>>>>>>> updated.
>>>>>>>>> 2 *"During the commit validation phase of a Merge operation, we
>>>>>>>>> need to verify that for each data file that would be removed, there 
>>>>>>>>> are no
>>>>>>>>> new position deletes with higher sequence number added."* : Just
>>>>>>>>> to be clear; for the tables only being written into by a Flink CDC
>>>>>>>>> pipeline, this should not happen as position deletes are only created 
>>>>>>>>> for
>>>>>>>>> in-progress (uncommitted) data files, correct ?
>>>>>>>>> Thanks and regards,
>>>>>>>>> - Puneet
>>>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk
>>>>>>>>>> might be associated with position deletes, but after the flush all 
>>>>>>>>>> deletes
>>>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as 
>>>>>>>>>> data
>>>>>>>>>> files for position deletes are not removed, the process should be 
>>>>>>>>>> able to
>>>>>>>>>> succeed with optimistic retry. Currently we are missing the 
>>>>>>>>>> following that
>>>>>>>>>> needs to be worked on to resolve the CDC performance issue:
>>>>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>>>>> content files.
>>>>>>>>>> 2. During the commit validation phase of a Merge operation, we
>>>>>>>>>> need to verify that for each data file that would be removed, there 
>>>>>>>>>> are no
>>>>>>>>>> new position deletes with higher sequence number added. If detected, 
>>>>>>>>>> merge
>>>>>>>>>> of that file has to be completely retried (we can support incremental
>>>>>>>>>> progress for this).
>>>>>>>>>> -Jack
>>>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>>>>> russell.spit...@gmail.com> wrote:
>>>>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>>>>> differently
>>>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>>>>> you pick files based on their number of delete files. So basically 
>>>>>>>>>>> you can
>>>>>>>>>>> set a variety of parameters, small files, large files, files with 
>>>>>>>>>>> deletes
>>>>>>>>>>> etc ...
>>>>>>>>>>> A new strategy is added which determines which file to rewrite
>>>>>>>>>>> by looking for all files currently touched by delete files. Instead 
>>>>>>>>>>> of
>>>>>>>>>>> looking through files with X deletes, we look up all files affected 
>>>>>>>>>>> by
>>>>>>>>>>> deletes and rewrite them. Although now as I write this it's 
>>>>>>>>>>> basically
>>>>>>>>>>> running the above strategies with number of delete files >= 1 and 
>>>>>>>>>>> files per
>>>>>>>>>>> group at 1. So maybe it doesn't need another strategy?
>>>>>>>>>>> But maybe I got that wrong ...
>>>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>>>> Here are some key takeaways:
>>>>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as
>>>>>>>>>>>> proposed in https://github.com/apache/iceberg/pull/3207, but
>>>>>>>>>>>> instead as a new strategy by extending the existing 
>>>>>>>>>>>> BinPackStrategy. For
>>>>>>>>>>>> users who would also like to run sort during Merge, we will have 
>>>>>>>>>>>> another
>>>>>>>>>>>> delete strategy that extends the SortStrategy.
>>>>>>>>>>>> 3. Merge can have an option that allows users to set the
>>>>>>>>>>>> minimum numbers of delete files to trigger a compaction. However, 
>>>>>>>>>>>> that
>>>>>>>>>>>> would result in very frequent compaction of full partition if 
>>>>>>>>>>>> people add
>>>>>>>>>>>> many global delete files. A Convert of global equality deletes to 
>>>>>>>>>>>> partition
>>>>>>>>>>>> position deletes while maintaining the same sequence number can be 
>>>>>>>>>>>> used to
>>>>>>>>>>>> solve the issue. Currently there is no way to write files with a 
>>>>>>>>>>>> custom
>>>>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>>>>> partition level filters. The general direction in the meeting is 
>>>>>>>>>>>> to just
>>>>>>>>>>>> have a single filter method. We will sync offline to reach an 
>>>>>>>>>>>> agreement.
>>>>>>>>>>>> 6. people raised the issue that if new delete files are added
>>>>>>>>>>>> to a data file while a Merge is going on, then the Merge would 
>>>>>>>>>>>> fail. That
>>>>>>>>>>>> causes huge performance issues for CDC streaming use cases and 
>>>>>>>>>>>> Merge is
>>>>>>>>>>>> very hard to succeed. There are 2 proposed solutions:
>>>>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until 
>>>>>>>>>>>> the
>>>>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do
>>>>>>>>>>>> any bin-packing, and only merges the delete files for each data 
>>>>>>>>>>>> file and
>>>>>>>>>>>> writes it back. The new data file will have the same sequence 
>>>>>>>>>>>> number as the
>>>>>>>>>>>> old file before Merge. By doing so, new delete files can still be 
>>>>>>>>>>>> applied
>>>>>>>>>>>> safely and the compaction can succeed without concerns around 
>>>>>>>>>>>> conflict. The
>>>>>>>>>>>> caveat is that this does not work for position deletes because the 
>>>>>>>>>>>> row
>>>>>>>>>>>> position changes for each file after Merge. But for the CDC 
>>>>>>>>>>>> streaming use
>>>>>>>>>>>> case it is acceptable to only write equality deletes, so this 
>>>>>>>>>>>> looks like a
>>>>>>>>>>>> feasible approach.
>>>>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will 
>>>>>>>>>>>> continue
>>>>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link 
>>>>>>>>>>>>> is
>>>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>>>>> join the channel for daily discussions if you are interested in 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> progress.
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we
>>>>>>>>>>>>>> are seeing an increasing number of requests for delete 
>>>>>>>>>>>>>> compaction support.
>>>>>>>>>>>>>> Here is a document discussing the use cases and basic
>>>>>>>>>>>>>> interface design for it to get the community aligned around what
>>>>>>>>>>>>>> compactions we would offer and how the interfaces would be 
>>>>>>>>>>>>>> divided:
>>>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jack Ye

Reply via email to