> No removed temporary files on Flink failure. (Spark orphan file removal needs to be configured to prevent removal of Flink temporary files which are needed on recovery)
This sounds like it's a larger problem. Shouldn't Flink store its state in a different prefix that won't be cleaned up by orphan files? On Wed, Feb 28, 2024 at 3:04 AM Péter Váry <[email protected]> wrote: > Sorry to chime in a bit late to the conversation. > > I am currently working in implementing Flink in-job maintenance. > > The main target audience: > - Users who can't or don't want to use Spark > - Users who need frequent checkpointing (low latency in the Iceberg table) > and have many small files > - CDC users with frequent updates > > The planned architecture: > - Migrate FlinkSink to SinkV2, so the new Flink PostCommitTopology could > be used - Flink changes will be released in the upcoming Flink 1.19.0. > Preliminary PR for the migration is here: > https://github.com/apache/iceberg/pull/8653 > - Create a scheduler for starting maintenance tasks. This scheduler could > be placed in the PostCommitTopology and the tasks could be started based on > the recent commits in the table (useful if there is no other writers), or a > separate task could monitor the new snapshot for a table, and feed the > scheduler with the info (useful if more than one job writes to the table, > or resource usage for compaction and the job should be separated) > - Specific maintenance tasks (Data file rewrite, Manifest rewrite, > Snapshot expiration, Orphan file removal, etc). In the beginning we do not > aim for the same feature-rich maintenance tasks that we have in Spark, but > we would like to cover the basics, and reuse as much as possible. One > feature I think is further down the road: is the global ordering, and > splitting of the data files which is done in SparkShufflingDataRewriter - > which is also mentioned by Ryan above. We could do this using the same > shuffling driven by Steven for the writes, but this needs time. > > Pros: > - No Spark is needed if you only need Flink > - No removed temporary files on Flink failure. (Spark orphan file removal > needs to be configured to prevent removal of Flink temporary files which > are needed on recovery) > - The work which is deferred on write is finally done > - No extra table is needed > > Cons: > - Still missing the global reordering of the records > > I hope this helps. > Peter > > Renjie Liu <[email protected]> ezt írta (időpont: 2024. febr. 27., > K, 2:18): > >> The multi-table solution also fits better with materialized views, >>> eventually. >> >> >> Good point! >> >> On Tue, Feb 27, 2024 at 6:39 AM Ryan Blue <[email protected]> wrote: >> >>> I can give an update on using branches vs using tables. I originally >>> wanted to use branches, but I now prefer the approach using separate tables. >>> >>> The main reason to use a separate table is to avoid schema differences >>> between branches. We recently looked into whether to support different >>> schemas across table branches and decided not to pursue it. The problem is >>> how to reconcile schema changes between branches. If you run ADD COLUMN on >>> two different branches with the same column name, should Iceberg use the >>> same field ID for both? There are cases where it should (two audited >>> commits) and cases where it should not (testing code revisions). I don't >>> think we can tell when adding the column which decision is correct, and >>> getting it wrong is a correctness problem. >>> >>> Since it is a problem to have different schemas across branches, it's >>> also a problem to keep CDC metadata and older records in a branch of the >>> merged mirror table. If you wanted to do this, you can use an optional >>> struct of CDC metadata that you can set to null in the main branch, but >>> you'd still have to decide how to handle dropped columns. I think most >>> people don't want to have dropped columns in the mirror table, but if >>> you're looking at the changelog table you do want them there since they're >>> part of the historical record. >>> >>> It would be nice to have a single table, but that doesn't seem worth the >>> extra headache of handling extra schema issues and committing more often >>> (increasing the risk of conflicts). The multi-table solution also fits >>> better with materialized views, eventually. >>> >>> Ryan >>> >>> On Sun, Feb 25, 2024 at 5:43 PM Manu Zhang <[email protected]> >>> wrote: >>> >>>> Thanks Ryan for the updates! I've already read your blog series and >>>> learned a lot. >>>> Our customer is using a similar Flink Append + Spark MergeInto >>>> approach. I'm wondering whether there's a plan to implement this pattern >>>> with two branches as proposed by Jack in Improvement 3. >>>> >>>> Manu >>>> >>>> >>>> >>>> On Mon, Feb 26, 2024 at 5:13 AM Ryan Blue <[email protected]> wrote: >>>> >>>>> Manu, >>>>> >>>>> I haven't seen much improvement to the Flink/UPSERT approach to CDC. >>>>> It's still half-finished. There are some efforts to add table maintenance >>>>> to Flink, but the main issues -- work is being deferred and never done -- >>>>> haven't been addressed. I don't recommend this approach. >>>>> >>>>> The other approach has seen progress. I wrote a blog series >>>>> <https://tabular.io/blog/cdc-zen-art-of-cdc-performance/> about CDC >>>>> patterns that builds to the pattern of using separate changelog and mirror >>>>> tables to help people understand how to do it and what the benefits are. >>>>> We >>>>> are also getting closer to a release with views that will allow us to use >>>>> the latest from the changelog table at read time. That's going to be the >>>>> most efficient implementation overall. >>>>> >>>>> Ryan >>>>> >>>>> On Tue, Feb 20, 2024 at 1:41 AM Manu Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> Bump up this thread again. Are we actively working on any proposed >>>>>> approaches? >>>>>> >>>>>> Manu >>>>>> >>>>>> On Fri, May 5, 2023 at 9:14 AM Ryan Blue <[email protected]> wrote: >>>>>> >>>>>>> Thanks for taking the time to write this up, Jack! It definitely >>>>>>> overlaps my own thinking, which is a good confirmation that we're on the >>>>>>> right track. There are a couple of things I want to add to the >>>>>>> discussion. >>>>>>> >>>>>>> First, I think the doc relies fairly heavily on the Iceberg/Flink >>>>>>> approach being _the_ approach to CDC with Iceberg. That's not how I've >>>>>>> seen >>>>>>> the pattern implemented at scale, and I generally think of the >>>>>>> Iceberg/Flink implementation as unfinished -- it is missing critical >>>>>>> components. >>>>>>> >>>>>>> As a format, Iceberg tries to be flexible and allow you to make >>>>>>> trade-offs. The main trade-off you get is to defer work until later. >>>>>>> Sorting is a good example, where Flink can't sort easily so it leaves >>>>>>> sorting for downstream systems. Another way we defer work is using >>>>>>> delete >>>>>>> files to keep write amplification down. And yet another way is using >>>>>>> equality delete files to avoid needing to locate records. The issue with >>>>>>> the Iceberg/Flink approach is that it uses all 3 of these and defers a >>>>>>> ton >>>>>>> of work that never gets completed. I think the Iceberg/Flink UPSERT >>>>>>> feature is incomplete and would not recommend it without something >>>>>>> cleaning >>>>>>> up the tables. >>>>>>> >>>>>>> It seems to me that Approaches 1 and 2 are trying to fix this, but >>>>>>> not really in straightforward ways. I like that Approach 1 also >>>>>>> preserves >>>>>>> history, but I think there's a better and more direction option in >>>>>>> Approach >>>>>>> 3. >>>>>>> >>>>>>> Second, I think we also need to consider transactional consistency. >>>>>>> This isn't hard to achieve and is how people consuming the table think >>>>>>> about the data. I think we should always try to mirror consistency in >>>>>>> the >>>>>>> source table downstream in the mirror table. >>>>>>> >>>>>>> In the end, I think we probably agree on the overall approach (3). >>>>>>> Keep a changelog branch or table and a mirror table, then keep them in >>>>>>> sync. I'd also add views to the mix to get the latest up-to-date >>>>>>> information. We can also make this better by adapting Approach 2 for >>>>>>> streaming writes. I think Anton has been working on an index approach >>>>>>> that >>>>>>> would work. >>>>>>> >>>>>>> If we're aligned, I think it should be easy to start building this >>>>>>> pattern and adding support in Iceberg for things like updating the >>>>>>> schemas >>>>>>> at the same time. >>>>>>> >>>>>>> Ryan >>>>>>> >>>>>>> On Thu, May 4, 2023 at 3:00 PM Steven Wu <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Jack for the great write-up. Good summary of the current >>>>>>>> landscape of CDC too. Left a few comments to discuss. >>>>>>>> >>>>>>>> On Wed, Apr 26, 2023 at 11:38 AM Anton Okolnychyi >>>>>>>> <[email protected]> wrote: >>>>>>>> >>>>>>>>> Thanks for starting a thread, Jack! I am yet to go through the >>>>>>>>> proposal. >>>>>>>>> >>>>>>>>> I recently came across a similar idea in BigQuery, which relies on >>>>>>>>> a staleness threshold: >>>>>>>>> >>>>>>>>> https://cloud.google.com/blog/products/data-analytics/bigquery-gains-change-data-capture-functionality/ >>>>>>>>> >>>>>>>>> It would also be nice to check if there are any applicable ideas >>>>>>>>> in Paimon: >>>>>>>>> https://github.com/apache/incubator-paimon/ >>>>>>>>> >>>>>>>>> - Anton >>>>>>>>> >>>>>>>>> On Apr 26, 2023, at 11:32 AM, Jack Ye <[email protected]> wrote: >>>>>>>>> >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> As we discussed in the community sync, it looks like we have some >>>>>>>>> general interest in improving the CDC streaming process. Dan >>>>>>>>> mentioned that >>>>>>>>> Ryan has a proposal about an alternative CDC approach that has an >>>>>>>>> accumulated changelog that is periodically synced to a target table. >>>>>>>>> >>>>>>>>> I have a very similar design doc I have been working on for quite >>>>>>>>> some time to describe a set of improvements we could do to the >>>>>>>>> Iceberg CDC >>>>>>>>> use case, and it contains a very similar improvement (see improvement >>>>>>>>> 3). >>>>>>>>> >>>>>>>>> I would appreciate feedback from the community about this doc, and >>>>>>>>> I can organize some meetings to discuss our thoughts about this topic >>>>>>>>> afterwards. >>>>>>>>> >>>>>>>>> Doc link: >>>>>>>>> https://docs.google.com/document/d/1kyyJp4masbd1FrIKUHF1ED_z1hTARL8bNoKCgb7fhSQ/edit# >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jack Ye >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Tabular >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Tabular >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Tabular >>> >> -- Ryan Blue Tabular
