I have been thinking about this quite a bit.

Moving the temporary manifest files could work, but the prepared and not
yet committed data files are also present in their final place. These data
files are also not part of the table yet, and could be removed by the
orphan files removal process. Moving them to another place would add an
additional move(hdfs - cheap)/copy(S3 - costly) step for every commit,
which I'd like to avoid.

BTW, this generally applies to any concurrent inserts to the orphan file
removal. There we need to "guess", how long will it take to commit the new
data files. It is easier for a process which are running, but Flink could
recover from old state even after several days, so we face the issue more
often.

Thanks, Peter

On Wed, Feb 28, 2024, 17:40 Ryan Blue <b...@tabular.io> wrote:

> > No removed temporary files on Flink failure. (Spark orphan file removal
> needs to be configured to prevent removal of Flink temporary files which
> are needed on recovery)
>
> This sounds like it's a larger problem. Shouldn't Flink store its state in
> a different prefix that won't be cleaned up by orphan files?
>
> On Wed, Feb 28, 2024 at 3:04 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Sorry to chime in a bit late to the conversation.
>>
>> I am currently working in implementing Flink in-job maintenance.
>>
>> The main target audience:
>> - Users who can't or don't want to use Spark
>> - Users who need frequent checkpointing (low latency in the Iceberg
>> table) and have many small files
>> - CDC users with frequent updates
>>
>> The planned architecture:
>> - Migrate FlinkSink to SinkV2, so the new Flink PostCommitTopology could
>> be used - Flink changes will be released in the upcoming Flink 1.19.0.
>> Preliminary PR for the migration is here:
>> https://github.com/apache/iceberg/pull/8653
>> - Create a scheduler for starting maintenance tasks. This scheduler could
>> be placed in the PostCommitTopology and the tasks could be started based on
>> the recent commits in the table (useful if there is no other writers), or a
>> separate task could monitor the new snapshot for a table, and feed the
>> scheduler with the info (useful if more than one job writes to the table,
>> or resource usage for compaction and the job should be separated)
>> - Specific maintenance tasks (Data file rewrite, Manifest rewrite,
>> Snapshot expiration, Orphan file removal, etc). In the beginning we do not
>> aim for the same feature-rich maintenance tasks that we have in Spark, but
>> we would like to cover the basics, and reuse as much as possible. One
>> feature I think is further down the road: is the global ordering, and
>> splitting of the data files which is done in SparkShufflingDataRewriter -
>> which is also mentioned by Ryan above. We could do this using the same
>> shuffling driven by Steven for the writes, but this needs time.
>>
>> Pros:
>> - No Spark is needed if you only need Flink
>> - No removed temporary files on Flink failure. (Spark orphan file removal
>> needs to be configured to prevent removal of Flink temporary files which
>> are needed on recovery)
>> - The work which is deferred on write is finally done
>> - No extra table is needed
>>
>> Cons:
>> - Still missing the global reordering of the records
>>
>> I hope this helps.
>> Peter
>>
>> Renjie Liu <liurenjie2...@gmail.com> ezt írta (időpont: 2024. febr. 27.,
>> K, 2:18):
>>
>>>  The multi-table solution also fits better with materialized views,
>>>> eventually.
>>>
>>>
>>> Good point!
>>>
>>> On Tue, Feb 27, 2024 at 6:39 AM Ryan Blue <b...@tabular.io> wrote:
>>>
>>>> I can give an update on using branches vs using tables. I originally
>>>> wanted to use branches, but I now prefer the approach using separate 
>>>> tables.
>>>>
>>>> The main reason to use a separate table is to avoid schema differences
>>>> between branches. We recently looked into whether to support different
>>>> schemas across table branches and decided not to pursue it. The problem is
>>>> how to reconcile schema changes between branches. If you run ADD COLUMN on
>>>> two different branches with the same column name, should Iceberg use the
>>>> same field ID for both? There are cases where it should (two audited
>>>> commits) and cases where it should not (testing code revisions). I don't
>>>> think we can tell when adding the column which decision is correct, and
>>>> getting it wrong is a correctness problem.
>>>>
>>>> Since it is a problem to have different schemas across branches, it's
>>>> also a problem to keep CDC metadata and older records in a branch of the
>>>> merged mirror table. If you wanted to do this, you can use an optional
>>>> struct of CDC metadata that you can set to null in the main branch, but
>>>> you'd still have to decide how to handle dropped columns. I think most
>>>> people don't want to have dropped columns in the mirror table, but if
>>>> you're looking at the changelog table you do want them there since they're
>>>> part of the historical record.
>>>>
>>>> It would be nice to have a single table, but that doesn't seem worth
>>>> the extra headache of handling extra schema issues and committing more
>>>> often (increasing the risk of conflicts). The multi-table solution also
>>>> fits better with materialized views, eventually.
>>>>
>>>> Ryan
>>>>
>>>> On Sun, Feb 25, 2024 at 5:43 PM Manu Zhang <owenzhang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Ryan for the updates! I've already read your blog series and
>>>>> learned a lot.
>>>>> Our customer is using a similar Flink Append + Spark MergeInto
>>>>> approach. I'm wondering whether there's a plan to implement this pattern
>>>>> with two branches as proposed by Jack in Improvement 3.
>>>>>
>>>>> Manu
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 26, 2024 at 5:13 AM Ryan Blue <b...@tabular.io> wrote:
>>>>>
>>>>>> Manu,
>>>>>>
>>>>>> I haven't seen much improvement to the Flink/UPSERT approach to CDC.
>>>>>> It's still half-finished. There are some efforts to add table maintenance
>>>>>> to Flink, but the main issues -- work is being deferred and never done --
>>>>>> haven't been addressed. I don't recommend this approach.
>>>>>>
>>>>>> The other approach has seen progress. I wrote a blog series
>>>>>> <https://tabular.io/blog/cdc-zen-art-of-cdc-performance/> about CDC
>>>>>> patterns that builds to the pattern of using separate changelog and 
>>>>>> mirror
>>>>>> tables to help people understand how to do it and what the benefits are. 
>>>>>> We
>>>>>> are also getting closer to a release with views that will allow us to use
>>>>>> the latest from the changelog table at read time. That's going to be the
>>>>>> most efficient implementation overall.
>>>>>>
>>>>>> Ryan
>>>>>>
>>>>>> On Tue, Feb 20, 2024 at 1:41 AM Manu Zhang <owenzhang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Bump up this thread again. Are we actively working on any proposed
>>>>>>> approaches?
>>>>>>>
>>>>>>> Manu
>>>>>>>
>>>>>>> On Fri, May 5, 2023 at 9:14 AM Ryan Blue <b...@tabular.io> wrote:
>>>>>>>
>>>>>>>> Thanks for taking the time to write this up, Jack! It definitely
>>>>>>>> overlaps my own thinking, which is a good confirmation that we're on 
>>>>>>>> the
>>>>>>>> right track. There are a couple of things I want to add to the 
>>>>>>>> discussion.
>>>>>>>>
>>>>>>>> First, I think the doc relies fairly heavily on the Iceberg/Flink
>>>>>>>> approach being _the_ approach to CDC with Iceberg. That's not how I've 
>>>>>>>> seen
>>>>>>>> the pattern implemented at scale, and I generally think of the
>>>>>>>> Iceberg/Flink implementation as unfinished -- it is missing critical
>>>>>>>> components.
>>>>>>>>
>>>>>>>> As a format, Iceberg tries to be flexible and allow you to make
>>>>>>>> trade-offs. The main trade-off you get is to defer work until later.
>>>>>>>> Sorting is a good example, where Flink can't sort easily so it leaves
>>>>>>>> sorting for downstream systems. Another way we defer work is using 
>>>>>>>> delete
>>>>>>>> files to keep write amplification down. And yet another way is using
>>>>>>>> equality delete files to avoid needing to locate records. The issue 
>>>>>>>> with
>>>>>>>> the Iceberg/Flink approach is that it uses all 3 of these and defers a 
>>>>>>>> ton
>>>>>>>> of work that never gets completed. I think the Iceberg/Flink UPSERT
>>>>>>>> feature is incomplete and would not recommend it without something 
>>>>>>>> cleaning
>>>>>>>> up the tables.
>>>>>>>>
>>>>>>>> It seems to me that Approaches 1 and 2 are trying to fix this, but
>>>>>>>> not really in straightforward ways. I like that Approach 1 also 
>>>>>>>> preserves
>>>>>>>> history, but I think there's a better and more direction option in 
>>>>>>>> Approach
>>>>>>>> 3.
>>>>>>>>
>>>>>>>> Second, I think we also need to consider transactional consistency.
>>>>>>>> This isn't hard to achieve and is how people consuming the table think
>>>>>>>> about the data. I think we should always try to mirror consistency in 
>>>>>>>> the
>>>>>>>> source table downstream in the mirror table.
>>>>>>>>
>>>>>>>> In the end, I think we probably agree on the overall approach (3).
>>>>>>>> Keep a changelog branch or table and a mirror table, then keep them in
>>>>>>>> sync. I'd also add views to the mix to get the latest up-to-date
>>>>>>>> information. We can also make this better by adapting Approach 2 for
>>>>>>>> streaming writes. I think Anton has been working on an index approach 
>>>>>>>> that
>>>>>>>> would work.
>>>>>>>>
>>>>>>>> If we're aligned, I think it should be easy to start building this
>>>>>>>> pattern and adding support in Iceberg for things like updating the 
>>>>>>>> schemas
>>>>>>>> at the same time.
>>>>>>>>
>>>>>>>> Ryan
>>>>>>>>
>>>>>>>> On Thu, May 4, 2023 at 3:00 PM Steven Wu <stevenz...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Jack for the great write-up. Good summary of the current
>>>>>>>>> landscape of CDC too. Left a few comments to discuss.
>>>>>>>>>
>>>>>>>>> On Wed, Apr 26, 2023 at 11:38 AM Anton Okolnychyi
>>>>>>>>> <aokolnyc...@apple.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for starting a thread, Jack! I am yet to go through the
>>>>>>>>>> proposal.
>>>>>>>>>>
>>>>>>>>>> I recently came across a similar idea in BigQuery, which relies
>>>>>>>>>> on a staleness threshold:
>>>>>>>>>>
>>>>>>>>>> https://cloud.google.com/blog/products/data-analytics/bigquery-gains-change-data-capture-functionality/
>>>>>>>>>>
>>>>>>>>>> It would also be nice to check if there are any applicable ideas
>>>>>>>>>> in Paimon:
>>>>>>>>>> https://github.com/apache/incubator-paimon/
>>>>>>>>>>
>>>>>>>>>> - Anton
>>>>>>>>>>
>>>>>>>>>> On Apr 26, 2023, at 11:32 AM, Jack Ye <yezhao...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> As we discussed in the community sync, it looks like we have some
>>>>>>>>>> general interest in improving the CDC streaming process. Dan 
>>>>>>>>>> mentioned that
>>>>>>>>>> Ryan has a proposal about an alternative CDC approach that has an
>>>>>>>>>> accumulated changelog that is periodically synced to a target table.
>>>>>>>>>>
>>>>>>>>>> I have a very similar design doc I have been working on for quite
>>>>>>>>>> some time to describe a set of improvements we could do to the 
>>>>>>>>>> Iceberg CDC
>>>>>>>>>> use case, and it contains a very similar improvement (see 
>>>>>>>>>> improvement 3).
>>>>>>>>>>
>>>>>>>>>> I would appreciate feedback from the community about this doc,
>>>>>>>>>> and I can organize some meetings to discuss our thoughts about this 
>>>>>>>>>> topic
>>>>>>>>>> afterwards.
>>>>>>>>>>
>>>>>>>>>> Doc link:
>>>>>>>>>> https://docs.google.com/document/d/1kyyJp4masbd1FrIKUHF1ED_z1hTARL8bNoKCgb7fhSQ/edit#
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jack Ye
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Tabular
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Tabular
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to