> No removed temporary files on Flink failure. (Spark orphan file removal
needs to be configured to prevent removal of Flink temporary files which
are needed on recovery)

This sounds like it's a larger problem. Shouldn't Flink store its state in
a different prefix that won't be cleaned up by orphan files?

On Wed, Feb 28, 2024 at 3:04 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Sorry to chime in a bit late to the conversation.
>
> I am currently working in implementing Flink in-job maintenance.
>
> The main target audience:
> - Users who can't or don't want to use Spark
> - Users who need frequent checkpointing (low latency in the Iceberg table)
> and have many small files
> - CDC users with frequent updates
>
> The planned architecture:
> - Migrate FlinkSink to SinkV2, so the new Flink PostCommitTopology could
> be used - Flink changes will be released in the upcoming Flink 1.19.0.
> Preliminary PR for the migration is here:
> https://github.com/apache/iceberg/pull/8653
> - Create a scheduler for starting maintenance tasks. This scheduler could
> be placed in the PostCommitTopology and the tasks could be started based on
> the recent commits in the table (useful if there is no other writers), or a
> separate task could monitor the new snapshot for a table, and feed the
> scheduler with the info (useful if more than one job writes to the table,
> or resource usage for compaction and the job should be separated)
> - Specific maintenance tasks (Data file rewrite, Manifest rewrite,
> Snapshot expiration, Orphan file removal, etc). In the beginning we do not
> aim for the same feature-rich maintenance tasks that we have in Spark, but
> we would like to cover the basics, and reuse as much as possible. One
> feature I think is further down the road: is the global ordering, and
> splitting of the data files which is done in SparkShufflingDataRewriter -
> which is also mentioned by Ryan above. We could do this using the same
> shuffling driven by Steven for the writes, but this needs time.
>
> Pros:
> - No Spark is needed if you only need Flink
> - No removed temporary files on Flink failure. (Spark orphan file removal
> needs to be configured to prevent removal of Flink temporary files which
> are needed on recovery)
> - The work which is deferred on write is finally done
> - No extra table is needed
>
> Cons:
> - Still missing the global reordering of the records
>
> I hope this helps.
> Peter
>
> Renjie Liu <liurenjie2...@gmail.com> ezt írta (időpont: 2024. febr. 27.,
> K, 2:18):
>
>>  The multi-table solution also fits better with materialized views,
>>> eventually.
>>
>>
>> Good point!
>>
>> On Tue, Feb 27, 2024 at 6:39 AM Ryan Blue <b...@tabular.io> wrote:
>>
>>> I can give an update on using branches vs using tables. I originally
>>> wanted to use branches, but I now prefer the approach using separate tables.
>>>
>>> The main reason to use a separate table is to avoid schema differences
>>> between branches. We recently looked into whether to support different
>>> schemas across table branches and decided not to pursue it. The problem is
>>> how to reconcile schema changes between branches. If you run ADD COLUMN on
>>> two different branches with the same column name, should Iceberg use the
>>> same field ID for both? There are cases where it should (two audited
>>> commits) and cases where it should not (testing code revisions). I don't
>>> think we can tell when adding the column which decision is correct, and
>>> getting it wrong is a correctness problem.
>>>
>>> Since it is a problem to have different schemas across branches, it's
>>> also a problem to keep CDC metadata and older records in a branch of the
>>> merged mirror table. If you wanted to do this, you can use an optional
>>> struct of CDC metadata that you can set to null in the main branch, but
>>> you'd still have to decide how to handle dropped columns. I think most
>>> people don't want to have dropped columns in the mirror table, but if
>>> you're looking at the changelog table you do want them there since they're
>>> part of the historical record.
>>>
>>> It would be nice to have a single table, but that doesn't seem worth the
>>> extra headache of handling extra schema issues and committing more often
>>> (increasing the risk of conflicts). The multi-table solution also fits
>>> better with materialized views, eventually.
>>>
>>> Ryan
>>>
>>> On Sun, Feb 25, 2024 at 5:43 PM Manu Zhang <owenzhang1...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ryan for the updates! I've already read your blog series and
>>>> learned a lot.
>>>> Our customer is using a similar Flink Append + Spark MergeInto
>>>> approach. I'm wondering whether there's a plan to implement this pattern
>>>> with two branches as proposed by Jack in Improvement 3.
>>>>
>>>> Manu
>>>>
>>>>
>>>>
>>>> On Mon, Feb 26, 2024 at 5:13 AM Ryan Blue <b...@tabular.io> wrote:
>>>>
>>>>> Manu,
>>>>>
>>>>> I haven't seen much improvement to the Flink/UPSERT approach to CDC.
>>>>> It's still half-finished. There are some efforts to add table maintenance
>>>>> to Flink, but the main issues -- work is being deferred and never done --
>>>>> haven't been addressed. I don't recommend this approach.
>>>>>
>>>>> The other approach has seen progress. I wrote a blog series
>>>>> <https://tabular.io/blog/cdc-zen-art-of-cdc-performance/> about CDC
>>>>> patterns that builds to the pattern of using separate changelog and mirror
>>>>> tables to help people understand how to do it and what the benefits are. 
>>>>> We
>>>>> are also getting closer to a release with views that will allow us to use
>>>>> the latest from the changelog table at read time. That's going to be the
>>>>> most efficient implementation overall.
>>>>>
>>>>> Ryan
>>>>>
>>>>> On Tue, Feb 20, 2024 at 1:41 AM Manu Zhang <owenzhang1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Bump up this thread again. Are we actively working on any proposed
>>>>>> approaches?
>>>>>>
>>>>>> Manu
>>>>>>
>>>>>> On Fri, May 5, 2023 at 9:14 AM Ryan Blue <b...@tabular.io> wrote:
>>>>>>
>>>>>>> Thanks for taking the time to write this up, Jack! It definitely
>>>>>>> overlaps my own thinking, which is a good confirmation that we're on the
>>>>>>> right track. There are a couple of things I want to add to the 
>>>>>>> discussion.
>>>>>>>
>>>>>>> First, I think the doc relies fairly heavily on the Iceberg/Flink
>>>>>>> approach being _the_ approach to CDC with Iceberg. That's not how I've 
>>>>>>> seen
>>>>>>> the pattern implemented at scale, and I generally think of the
>>>>>>> Iceberg/Flink implementation as unfinished -- it is missing critical
>>>>>>> components.
>>>>>>>
>>>>>>> As a format, Iceberg tries to be flexible and allow you to make
>>>>>>> trade-offs. The main trade-off you get is to defer work until later.
>>>>>>> Sorting is a good example, where Flink can't sort easily so it leaves
>>>>>>> sorting for downstream systems. Another way we defer work is using 
>>>>>>> delete
>>>>>>> files to keep write amplification down. And yet another way is using
>>>>>>> equality delete files to avoid needing to locate records. The issue with
>>>>>>> the Iceberg/Flink approach is that it uses all 3 of these and defers a 
>>>>>>> ton
>>>>>>> of work that never gets completed. I think the Iceberg/Flink UPSERT
>>>>>>> feature is incomplete and would not recommend it without something 
>>>>>>> cleaning
>>>>>>> up the tables.
>>>>>>>
>>>>>>> It seems to me that Approaches 1 and 2 are trying to fix this, but
>>>>>>> not really in straightforward ways. I like that Approach 1 also 
>>>>>>> preserves
>>>>>>> history, but I think there's a better and more direction option in 
>>>>>>> Approach
>>>>>>> 3.
>>>>>>>
>>>>>>> Second, I think we also need to consider transactional consistency.
>>>>>>> This isn't hard to achieve and is how people consuming the table think
>>>>>>> about the data. I think we should always try to mirror consistency in 
>>>>>>> the
>>>>>>> source table downstream in the mirror table.
>>>>>>>
>>>>>>> In the end, I think we probably agree on the overall approach (3).
>>>>>>> Keep a changelog branch or table and a mirror table, then keep them in
>>>>>>> sync. I'd also add views to the mix to get the latest up-to-date
>>>>>>> information. We can also make this better by adapting Approach 2 for
>>>>>>> streaming writes. I think Anton has been working on an index approach 
>>>>>>> that
>>>>>>> would work.
>>>>>>>
>>>>>>> If we're aligned, I think it should be easy to start building this
>>>>>>> pattern and adding support in Iceberg for things like updating the 
>>>>>>> schemas
>>>>>>> at the same time.
>>>>>>>
>>>>>>> Ryan
>>>>>>>
>>>>>>> On Thu, May 4, 2023 at 3:00 PM Steven Wu <stevenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Jack for the great write-up. Good summary of the current
>>>>>>>> landscape of CDC too. Left a few comments to discuss.
>>>>>>>>
>>>>>>>> On Wed, Apr 26, 2023 at 11:38 AM Anton Okolnychyi
>>>>>>>> <aokolnyc...@apple.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> Thanks for starting a thread, Jack! I am yet to go through the
>>>>>>>>> proposal.
>>>>>>>>>
>>>>>>>>> I recently came across a similar idea in BigQuery, which relies on
>>>>>>>>> a staleness threshold:
>>>>>>>>>
>>>>>>>>> https://cloud.google.com/blog/products/data-analytics/bigquery-gains-change-data-capture-functionality/
>>>>>>>>>
>>>>>>>>> It would also be nice to check if there are any applicable ideas
>>>>>>>>> in Paimon:
>>>>>>>>> https://github.com/apache/incubator-paimon/
>>>>>>>>>
>>>>>>>>> - Anton
>>>>>>>>>
>>>>>>>>> On Apr 26, 2023, at 11:32 AM, Jack Ye <yezhao...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> As we discussed in the community sync, it looks like we have some
>>>>>>>>> general interest in improving the CDC streaming process. Dan 
>>>>>>>>> mentioned that
>>>>>>>>> Ryan has a proposal about an alternative CDC approach that has an
>>>>>>>>> accumulated changelog that is periodically synced to a target table.
>>>>>>>>>
>>>>>>>>> I have a very similar design doc I have been working on for quite
>>>>>>>>> some time to describe a set of improvements we could do to the 
>>>>>>>>> Iceberg CDC
>>>>>>>>> use case, and it contains a very similar improvement (see improvement 
>>>>>>>>> 3).
>>>>>>>>>
>>>>>>>>> I would appreciate feedback from the community about this doc, and
>>>>>>>>> I can organize some meetings to discuss our thoughts about this topic
>>>>>>>>> afterwards.
>>>>>>>>>
>>>>>>>>> Doc link:
>>>>>>>>> https://docs.google.com/document/d/1kyyJp4masbd1FrIKUHF1ED_z1hTARL8bNoKCgb7fhSQ/edit#
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jack Ye
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Tabular
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Tabular
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Tabular
>>>
>>

-- 
Ryan Blue
Tabular

Reply via email to