Sorry to chime in a bit late to the conversation.

I am currently working in implementing Flink in-job maintenance.

The main target audience:
- Users who can't or don't want to use Spark
- Users who need frequent checkpointing (low latency in the Iceberg table)
and have many small files
- CDC users with frequent updates

The planned architecture:
- Migrate FlinkSink to SinkV2, so the new Flink PostCommitTopology could be
used - Flink changes will be released in the upcoming Flink 1.19.0.
Preliminary PR for the migration is here:
https://github.com/apache/iceberg/pull/8653
- Create a scheduler for starting maintenance tasks. This scheduler could
be placed in the PostCommitTopology and the tasks could be started based on
the recent commits in the table (useful if there is no other writers), or a
separate task could monitor the new snapshot for a table, and feed the
scheduler with the info (useful if more than one job writes to the table,
or resource usage for compaction and the job should be separated)
- Specific maintenance tasks (Data file rewrite, Manifest rewrite, Snapshot
expiration, Orphan file removal, etc). In the beginning we do not aim for
the same feature-rich maintenance tasks that we have in Spark, but we would
like to cover the basics, and reuse as much as possible. One feature I
think is further down the road: is the global ordering, and splitting of
the data files which is done in SparkShufflingDataRewriter - which is also
mentioned by Ryan above. We could do this using the same shuffling driven
by Steven for the writes, but this needs time.

Pros:
- No Spark is needed if you only need Flink
- No removed temporary files on Flink failure. (Spark orphan file removal
needs to be configured to prevent removal of Flink temporary files which
are needed on recovery)
- The work which is deferred on write is finally done
- No extra table is needed

Cons:
- Still missing the global reordering of the records

I hope this helps.
Peter

Renjie Liu <liurenjie2...@gmail.com> ezt írta (időpont: 2024. febr. 27., K,
2:18):

>  The multi-table solution also fits better with materialized views,
>> eventually.
>
>
> Good point!
>
> On Tue, Feb 27, 2024 at 6:39 AM Ryan Blue <b...@tabular.io> wrote:
>
>> I can give an update on using branches vs using tables. I originally
>> wanted to use branches, but I now prefer the approach using separate tables.
>>
>> The main reason to use a separate table is to avoid schema differences
>> between branches. We recently looked into whether to support different
>> schemas across table branches and decided not to pursue it. The problem is
>> how to reconcile schema changes between branches. If you run ADD COLUMN on
>> two different branches with the same column name, should Iceberg use the
>> same field ID for both? There are cases where it should (two audited
>> commits) and cases where it should not (testing code revisions). I don't
>> think we can tell when adding the column which decision is correct, and
>> getting it wrong is a correctness problem.
>>
>> Since it is a problem to have different schemas across branches, it's
>> also a problem to keep CDC metadata and older records in a branch of the
>> merged mirror table. If you wanted to do this, you can use an optional
>> struct of CDC metadata that you can set to null in the main branch, but
>> you'd still have to decide how to handle dropped columns. I think most
>> people don't want to have dropped columns in the mirror table, but if
>> you're looking at the changelog table you do want them there since they're
>> part of the historical record.
>>
>> It would be nice to have a single table, but that doesn't seem worth the
>> extra headache of handling extra schema issues and committing more often
>> (increasing the risk of conflicts). The multi-table solution also fits
>> better with materialized views, eventually.
>>
>> Ryan
>>
>> On Sun, Feb 25, 2024 at 5:43 PM Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>>> Thanks Ryan for the updates! I've already read your blog series and
>>> learned a lot.
>>> Our customer is using a similar Flink Append + Spark MergeInto approach.
>>> I'm wondering whether there's a plan to implement this pattern with two
>>> branches as proposed by Jack in Improvement 3.
>>>
>>> Manu
>>>
>>>
>>>
>>> On Mon, Feb 26, 2024 at 5:13 AM Ryan Blue <b...@tabular.io> wrote:
>>>
>>>> Manu,
>>>>
>>>> I haven't seen much improvement to the Flink/UPSERT approach to CDC.
>>>> It's still half-finished. There are some efforts to add table maintenance
>>>> to Flink, but the main issues -- work is being deferred and never done --
>>>> haven't been addressed. I don't recommend this approach.
>>>>
>>>> The other approach has seen progress. I wrote a blog series
>>>> <https://tabular.io/blog/cdc-zen-art-of-cdc-performance/> about CDC
>>>> patterns that builds to the pattern of using separate changelog and mirror
>>>> tables to help people understand how to do it and what the benefits are. We
>>>> are also getting closer to a release with views that will allow us to use
>>>> the latest from the changelog table at read time. That's going to be the
>>>> most efficient implementation overall.
>>>>
>>>> Ryan
>>>>
>>>> On Tue, Feb 20, 2024 at 1:41 AM Manu Zhang <owenzhang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Bump up this thread again. Are we actively working on any proposed
>>>>> approaches?
>>>>>
>>>>> Manu
>>>>>
>>>>> On Fri, May 5, 2023 at 9:14 AM Ryan Blue <b...@tabular.io> wrote:
>>>>>
>>>>>> Thanks for taking the time to write this up, Jack! It definitely
>>>>>> overlaps my own thinking, which is a good confirmation that we're on the
>>>>>> right track. There are a couple of things I want to add to the 
>>>>>> discussion.
>>>>>>
>>>>>> First, I think the doc relies fairly heavily on the Iceberg/Flink
>>>>>> approach being _the_ approach to CDC with Iceberg. That's not how I've 
>>>>>> seen
>>>>>> the pattern implemented at scale, and I generally think of the
>>>>>> Iceberg/Flink implementation as unfinished -- it is missing critical
>>>>>> components.
>>>>>>
>>>>>> As a format, Iceberg tries to be flexible and allow you to make
>>>>>> trade-offs. The main trade-off you get is to defer work until later.
>>>>>> Sorting is a good example, where Flink can't sort easily so it leaves
>>>>>> sorting for downstream systems. Another way we defer work is using delete
>>>>>> files to keep write amplification down. And yet another way is using
>>>>>> equality delete files to avoid needing to locate records. The issue with
>>>>>> the Iceberg/Flink approach is that it uses all 3 of these and defers a 
>>>>>> ton
>>>>>> of work that never gets completed. I think the Iceberg/Flink UPSERT
>>>>>> feature is incomplete and would not recommend it without something 
>>>>>> cleaning
>>>>>> up the tables.
>>>>>>
>>>>>> It seems to me that Approaches 1 and 2 are trying to fix this, but
>>>>>> not really in straightforward ways. I like that Approach 1 also preserves
>>>>>> history, but I think there's a better and more direction option in 
>>>>>> Approach
>>>>>> 3.
>>>>>>
>>>>>> Second, I think we also need to consider transactional consistency.
>>>>>> This isn't hard to achieve and is how people consuming the table think
>>>>>> about the data. I think we should always try to mirror consistency in the
>>>>>> source table downstream in the mirror table.
>>>>>>
>>>>>> In the end, I think we probably agree on the overall approach (3).
>>>>>> Keep a changelog branch or table and a mirror table, then keep them in
>>>>>> sync. I'd also add views to the mix to get the latest up-to-date
>>>>>> information. We can also make this better by adapting Approach 2 for
>>>>>> streaming writes. I think Anton has been working on an index approach 
>>>>>> that
>>>>>> would work.
>>>>>>
>>>>>> If we're aligned, I think it should be easy to start building this
>>>>>> pattern and adding support in Iceberg for things like updating the 
>>>>>> schemas
>>>>>> at the same time.
>>>>>>
>>>>>> Ryan
>>>>>>
>>>>>> On Thu, May 4, 2023 at 3:00 PM Steven Wu <stevenz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Jack for the great write-up. Good summary of the current
>>>>>>> landscape of CDC too. Left a few comments to discuss.
>>>>>>>
>>>>>>> On Wed, Apr 26, 2023 at 11:38 AM Anton Okolnychyi
>>>>>>> <aokolnyc...@apple.com.invalid> wrote:
>>>>>>>
>>>>>>>> Thanks for starting a thread, Jack! I am yet to go through the
>>>>>>>> proposal.
>>>>>>>>
>>>>>>>> I recently came across a similar idea in BigQuery, which relies on
>>>>>>>> a staleness threshold:
>>>>>>>>
>>>>>>>> https://cloud.google.com/blog/products/data-analytics/bigquery-gains-change-data-capture-functionality/
>>>>>>>>
>>>>>>>> It would also be nice to check if there are any applicable ideas in
>>>>>>>> Paimon:
>>>>>>>> https://github.com/apache/incubator-paimon/
>>>>>>>>
>>>>>>>> - Anton
>>>>>>>>
>>>>>>>> On Apr 26, 2023, at 11:32 AM, Jack Ye <yezhao...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> As we discussed in the community sync, it looks like we have some
>>>>>>>> general interest in improving the CDC streaming process. Dan mentioned 
>>>>>>>> that
>>>>>>>> Ryan has a proposal about an alternative CDC approach that has an
>>>>>>>> accumulated changelog that is periodically synced to a target table.
>>>>>>>>
>>>>>>>> I have a very similar design doc I have been working on for quite
>>>>>>>> some time to describe a set of improvements we could do to the Iceberg 
>>>>>>>> CDC
>>>>>>>> use case, and it contains a very similar improvement (see improvement 
>>>>>>>> 3).
>>>>>>>>
>>>>>>>> I would appreciate feedback from the community about this doc, and
>>>>>>>> I can organize some meetings to discuss our thoughts about this topic
>>>>>>>> afterwards.
>>>>>>>>
>>>>>>>> Doc link:
>>>>>>>> https://docs.google.com/document/d/1kyyJp4masbd1FrIKUHF1ED_z1hTARL8bNoKCgb7fhSQ/edit#
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jack Ye
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Tabular
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

Reply via email to