If they'd like to use the --no-claim mode that would be the way to go, yes.
Two points to be on the same page here: * all Flink native state backends (RocksDB, HashMap, changelog) would already support --no-claim * if in the end we add the --legacy mode, users can also use that mode instead of --claim. Best, Dawid On 26/11/2021 15:57, Till Rohrmann wrote: > Thanks for writing this FLIP Dawid. Just to clarify one thing for the > support of forced full snapshots. If a state backend does not support this > feature, then the user either has to copy the snapshot manually or resume > using --claim mode, create a savepoint in canonical format and then > change the state backend if he wants to use --no-claim, right? > > Cheers, > Till > > On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint? >> >> I don't think this is a good idea. The modes apply to both savepoints and >> checkpoints, plus it's much longer to type in. (minor) >> >> - add an explicit option to preserve the current behavior (no claim >> and no duplicate)? >> >> We had an offline discussion about it and so far we were leaning towards >> keeping the set of supported options minimal. However, if we really think >> the old behaviour is useful we can add a --legacy restore mode. cc >> @Konstantin @Piotr >> >> There seems to be a consensus in the discussion, however, I couldn't >> find stop-with-savepoint in the document. >> >> Sorry, I forgot, about this one. I added a note that savepoints generated >> from stop-with-savepoint should commit side effects. >> >> And I still think it would be nice to list object stores which support >> duplicate operation. >> >> I listed a couple of file systems that do have some sort of a COPY API. >> >> Best, >> >> Dawid >> On 26/11/2021 11:03, Roman Khachatryan wrote: >> >> Hi, >> >> Thanks for updating the FLIP Dawid >> >> There seems to be a consensus in the discussion, however, I couldn't >> find stop-with-savepoint in the document. >> >> A few minor things: >> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint? >> - add an explicit option to preserve the current behavior (no claim >> and no duplicate)? >> And I still think it would be nice to list object stores which support >> duplicate operation. >> >> Regards, >> Roman >> >> >> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf <kna...@apache.org> >> <kna...@apache.org> wrote: >> >> Hi Dawid, >> >> sounds good, specifically 2., too. >> >> Best, >> >> Konstantin >> >> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz <dwysakow...@apache.org> >> <dwysakow...@apache.org> >> wrote: >> >> >> Hi all, >> >> I updated the FLIP with a few clarifications: >> >> 1. I added a description how would we trigger a "full snapshot" in the >> changelog state backend >> - (We would go for this option in the 1st version). Trigger a >> snapshot of the base state backend in the 1st checkpoint, which induces >> materializing the changelog. In this approach we could duplicate SST >> files, >> but we would not duplicate the diff files. >> - Add a hook for logic for computing which task should duplicate the >> diff files. We would have to do a pass over all states after the state >> assignment in StateAssignmentOperation >> 2. I clarified that the "no-claim" mode requires a >> completed/successful checkpoint before we can remove the one we are >> restoring from. Also added a note that we can assume a checkpoint is >> completed if it is confirmed by Flink's API for checkpointing stats or by >> checking an entry in HA services. A checkpoint can not be assumed >> completed >> by just looking at the checkpoint files. >> >> I suggest going on with the proposal for "no-claim" as suggested so far, >> as it is easier to understand by users. They can reliably tell when they >> can expect the checkpoint to be deletable. If we see that the time to take >> the 1st checkpoint becomes a problem we can extend the set of restore >> methods and e.g. add a "claim-temporarily" method. >> >> I hope we can reach a consensus and start a vote, some time early next >> week. >> >> Best, >> >> Dawid >> >> On 23/11/2021 22:39, Roman Khachatryan wrote: >> >> I also referred to the "no-claim" mode and I still think neither of them >> works in that mode, as you'd have to keep lineage of checkpoints externally >> to be able delete any checkpoint. >> >> I think the lineage is needed in all approaches with arbitrary >> histories; the difference is whether a running Flink is required or >> not. Is that what you mean? >> (If not, could you please explain how the scenario you mentioned above >> with multiple jobs branching from the same checkpoint is handled?) >> >> >> BTW, the state key for RocksDB is actually: backend UID + key group range + >> SST file name, so the key would be different (the key group range is >> different for two tasks) and we would've two separate counters for the same >> file. >> >> You're right. But there is also a collision between old and new entries. >> >> >> To be on the same page here. It is not a problem so far in RocksDB, because >> we do not reuse any shared files in case of rescaling. >> >> As I mentioned above, collision happens not only because of rescaling; >> and AFAIK, there are some ideas to reuse files on rescaling (probably >> Yuan could clarify). Anyways, I think it makes sense to not bake in >> this assumption unless it's hard to implement (or at least state it >> explicitly in FLIP). >> >> >> It is not suggested as an optimization. It is suggested as a must for state >> backends that need it. I did not elaborate on it, because it could affected >> only the changelog state backend at the moment, which I don't have much >> insights. I agree it might make sense to look a bit how we could force full >> snapshots in the changelog state backend. I will spend some extra time on >> that. >> >> I see. For the Changelog state backend, the easiest way would be to >> obtain a full snapshot from the underlying backend in snapshot(), >> ignoring all non-materialized changes. This will effectively >> materialize all the changes, so only new non-materialized state will >> be used in subsequent checkpoints. >> >> >> Only the task that gets assigned [1,16] would be responsible for duplicating >> files of the old range [1, 64]. >> >> Wouldn't it be likely that the same TM will be responsible for [1, 64] >> "windowState", [1, 64] "timerState", and so on, for all operators in >> the chain, and probably other chains? (that what I mean by skew) >> If we want to address this, preserving handle immutability then we'll >> probably have to rebuild the whole task state snapshot. >> (depending on how we approach RocksDB re-uploading, it might not be >> relevant though) >> >> >> Regards, >> Roman >> >> >> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> wrote: >> >> I think I know where the confusion comes from regarding arbitrarily >> recovery histories: Both my counter-proposals were for "no-claim" >> mode; I didn't mean to replace "claim" mode with them. >> However, as Yun pointed out, it's impossible to guarantee that all the >> files will be compacted in a finite number of checkpoints; so let's >> withdraw those proposals. >> >> I also referred to the "no-claim" mode and I still think neither of them >> works in that mode, as you'd have to keep lineage of checkpoints externally >> to be able delete any checkpoint. >> >> Let's consider a job running with DoP=1; it created checkpoint C1 with >> a single file F1 and then stopped. >> We start a new job from C1 in no-claim mode with DoP=2; so two tasks >> will receive the same file F1. >> >> To be on the same page here. It is not a problem so far in RocksDB, because >> we do not reuse any shared files in case of rescaling. If we want to change >> how rescaling in RocksDB works then yes, we would have to consider how we >> want to make sure we copy/duplicate just once. However we would have to >> first change a crucial thing about regular incremental checkpoints and >> reorganize the SharedStateRegistry along the way. >> >> BTW, the state key for RocksDB is actually: backend UID + key group range + >> SST file name, so the key would be different (the key group range is >> different for two tasks) and we would've two separate counters for the same >> file. >> >> Of course, correct me if I am wrong in the two paragraphs above. >> >> Re-upload from one task (proposed in FLIP as optimization) >> >> It is not suggested as an optimization. It is suggested as a must for state >> backends that need it. I did not elaborate on it, because it could affected >> only the changelog state backend at the moment, which I don't have much >> insights. I agree it might make sense to look a bit how we could force full >> snapshots in the changelog state backend. I will spend some extra time on >> that. >> >> Lastly I might be wrong, but I think the KeyedStateHandle#getIntersection is >> a good candidate to distribute the task of duplicating shared files pretty >> evenly. The idea was that we could mark specially the handles that are >> assigned the "start of the old key group range". Therefore if a file >> belonged to a handle responsible for a key group range: [1,64], which is >> later on split into [1, 16], [17, 32], [33, 48]. [49, 64]. Only the task >> that gets assigned [1,16] would be responsible for duplicating files of the >> old range [1, 64]. >> >> Best, >> >> Dawid >> >> On 23/11/2021 14:27, Khachatryan Roman wrote: >> >> Thanks Dawid, Yun and Piotr, >> >> I think I know where the confusion comes from regarding arbitrarily >> recovery histories: Both my counter-proposals were for "no-claim" >> mode; I didn't mean to replace "claim" mode with them. >> However, as Yun pointed out, it's impossible to guarantee that all the >> files will be compacted in a finite number of checkpoints; so let's >> withdraw those proposals. >> >> And as there are no other alternatives left, the changes to >> SharedStateRegistry or State Backends are not a decisive factor >> anymore. >> >> However, it probably still makes sense to clarify the details of how >> re-upload will work in case of rescaling. >> >> Let's consider a job running with DoP=1; it created checkpoint C1 with >> a single file F1 and then stopped. >> We start a new job from C1 in no-claim mode with DoP=2; so two tasks >> will receive the same file F1. >> >> Let's say both tasks will re-use F1, so it needs to be re-uploaded. >> Now, we have a choice: >> 1. Re-upload from both tasks >> For RocksDB, the state key is: backend UID + SST file name. Both are >> the same for two tasks, so the key will be the same. >> Currently, SharedStateRegistry will reject both as duplicates. >> >> We can't just replace (to not lose one of the files), so we have to >> use random keys. >> However, when we further downscale: >> - we'll have a conflict on recovery (multiple SST files with the same name) >> - we'll re-upload the same file multiple times unnecessarily >> So we have to de-duplicate state on recovery - ideally before sending >> state snapshots to tasks. >> >> 2. Re-upload from one task (proposed in FLIP as optimization) >> Both tasks must learn the new key. Otherwise, the snapshot of the >> not-reuploading task will refer to a non-existing entry. >> We can either re-use the old key (and allow replacement in >> SharedStateRegistry); or generate the key on JM before sending task >> state snapshots. >> >> >> P.S.: >> >> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode? >> This is effectively what we have right now, but with an extra (Async?) >> >> Right now, there is absolutely no way to find out when the shared >> state can be deleted; it can't be inferred from which checkpoints are >> subsumed, and which are not, as future checkpoints might still be >> using that state. >> >> Regards, >> Roman >> >> >> >> On Tue, Nov 23, 2021 at 1:37 PM Piotr Nowojski <pnowoj...@apache.org> >> <pnowoj...@apache.org> <pnowoj...@apache.org> <pnowoj...@apache.org> wrote: >> >> Hi, >> >> I'm not entirely sure if I fully understand the raised concerns here. So >> let me maybe step back in the discussion a bit and address the original >> points from Roman. >> >> 2. Instead of forcing re-upload, can we "inverse control" in no-claim >> >> mode? >> >> I second the concerns from Dawid. This is effectively what we have right >> now, but with an extra (Async?) API call. It's not conceptually simple, >> it's hard to explain to the users, it might take actually forever to >> release the artefacts. Furthermore I don't think the implementation would >> be trivial. >> >> On the other hand the current proposal of having (a) `--claim` and (b) >> `--no-claim` mode are conceptually very simple. (a) being perfectly >> efficient, without any overheads. If you have concerns that (b) will cause >> some overheads, slower first checkpoint etc, keep in mind that the user can >> always pick option (a). Starting a new job from an existing >> savepoint/externalised checkpoint in general shouldn't be time critical, so >> users can always even manually copy the files and still use option (a), or >> just be fine accepting the price of a slower first checkpoint. For other >> use cases - restarting the same job after a downtime - (b) sounds to me to >> be an acceptable option. >> >> I would also like to point out that the "force full snapshot"/"do not use >> previous artefacts" option we will need either way for the incremental >> intermediate savepoints (subject of a next FLIP). From this perspective, we >> are getting the "--no-claim" option basically for free. >> >> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but after >> >> a configured number of checkpoints? >> >> I don't see a reason why we couldn't provide an option like that at some >> point in the future. However as it's more complicated to reason about, more >> complicated to implement and I'm not entirely sure how much actually needed >> given the (a) `--claim` mode, I think we can wait for feedback from the >> users before actually implementing it. >> >> 6. Enforcing re-upload by a single task and skew >> If we use some greedy logic like subtask 0 always re-uploads then it >> might be overloaded. >> So we'll have to obtain a full list of subtasks first (then probably >> choose randomly or round-robin). >> However, that requires rebuilding Task snapshot, which is doable but >> not trivial (which I think supports "reverse API option"). >> >> What do you mean by "rebuilding Task snapshot"? >> >> During some early discussions about this point, I've hoped that a state >> backend like changelog could embed into the state handle information which >> operator should actually be responsible for duplicating such shared states. >> However now that I'm thinking about it, indeed there might be an issue if >> we combine the fact that state handles can be shared across multiple >> different operators and with a job modification, like dropping an operator. >> In that case it looks like we would need some extra logic during recovery, >> that would have an overview of the whole job to make a decision which >> particular parallel instance of an operator should be responsible for >> duplicating the underlying file? >> >> Best, >> Piotrek >> >> wt., 23 lis 2021 o 12:28 Dawid Wysakowicz <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> napisaĆ(a): >> >> Some user has usage to restore different jobs based on a same externalized >> checkpoint. I think this usage would break after introducing this FLIP, and >> we must tell users explicitly if choose to make Flink manage the >> checkpoints by default. >> >> Could you elaborate what do you mean? The proposal is to use the >> "no-claim" mode by default which should let users start as many jobs as >> they wish from the same externalized checkpoints and it should not cause >> them any harm. Each job effectively will create effectively it's own >> private "copy" of the initial checkpoint along with the 1st taken >> checkpoint. >> >> If the 1st full checkpoint did not complete in the end, the next >> checkpoints have to try to reupload all artifacts again. I think this >> problem could be mitigated if task knows some files have been uploaded >> before. >> >> I don't know how we could achieve that easily. Besides, we have the same >> situation for all, even regular checkpoints don't we? Do we check if e.g. >> diff files has been successfully uploaded in a previous aborted checkpoint? >> I am not saying it's a wrong suggestion, just that I feel it is orthogonal >> and I can't see a straightforward way to implement it. >> >> Best, >> >> Dawid >> On 23/11/2021 07:52, Yun Tang wrote: >> >> Hi, >> >> For the likelihood of never deleting some SST files by RocksDB. >> Unfortunately, it could happen as current level compaction strategy in >> RocksDB is triggered by upper input level size reached to the threshold and >> the compaction priority cannot guarantee all files would be choosed during >> several round compactions. >> >> Actually, I am a bit in favor of this FLIP to manage checkpoints within >> Flink as we have heared from many users that they cannot delete older >> checkpoints after several rounds of re-launching Flink jobs. Current Flink >> would not delete older checkpoints automatically when restoring from older >> retained checkpoint, which makes the base checkpoint directory becomes >> larger and larger. However, if they decide to delete the older checkpoint >> directory of other job-ids, they might not be able to recover from the last >> completed checkpoint as it might depend on some artifacts in older >> checkpoint directory. >> >> And I think reuploading would indeed increase the 1st checkpoint duration >> after restoring. For aliyun oss, the developer said that copping files >> (larger than 32MB) from one location to another within same bucket on DFS >> could cause hundreds millseconds. However, from my experiences, copying on >> HDFS might not be so quick. Maybe some numbers here could be better. >> >> I have two questions here: >> 1. If the 1st full checkpoint did not complete in the end, the next >> checkpoints have to try to reupload all artifacts again. I think this >> problem could be mitigated if task knows some files have been uploaded >> before. >> 2. Some user has usage to restore different jobs based on a same >> externalized checkpoint. I think this usage would break after introducing >> this FLIP, and we must tell users explicitly if choose to make Flink manage >> the checkpoints by default. >> >> Best >> Yun Tang >> >> >> On 2021/11/22 19:49:11 Dawid Wysakowicz wrote: >> >> There is one more fundamental issue with either of your two >> proposals that've just came to my mind. >> What happens if you have externalized checkpoints and the job fails >> before the initial checkpoint can be safely removed? >> >> You start the job from the latest created checkpoint and wait for it >> to be allowed for deletion. Then you can delete it, and all previous >> checkpoints (or am I missing something?) >> >> >> Let me clarify it with an example. You start with chk-42, Flink takes >> e.g. three checkpoints chk-43, chk-44, chk-45 all still reference chk-42 >> files. After that it fails. We have externalized checkpoints enabled, >> therefore we have retained all checkpoints. Users starts a new program >> from let's say chk-45. At this point your proposal does not give the >> user any help in regards when chk-42 can be safely removed. (This is >> also how Flink works right now). >> >> To make it even harder you can arbitrarily complicate it, 1) start a job >> from chk-44, 2) start a job from a chk-47 which depends on chk-45, 3) >> never start a job from chk-44, it is not claimed by any job, thus it is >> never deleted, users must remember themselves that chk-44 originated >> from chk-42 etc.) User would be forced to build a lineage system for >> checkpoints to track which checkpoints depend on each other. >> >> I mean that the design described by FLIP implies the following (PCIIW): >> 1. treat SST files from the initial checkpoint specially: re-upload or >> send placeholder - depending on those attributes in state handle >> 2. (SST files from newer checkpoints are re-uploaded depending on >> confirmation currently; so yes there is tracking, but it's different) >> 3. SharedStateRegistry must allow replacing state under the existing >> key; otherwise, if a new key is used then other parallel subtasks >> should learn somehow this key and use it; However, allowing >> replacement must be limited to this scenario, otherwise it can lead to >> previous checkpoint corruption in normal cases >> >> I might not understand your points, but I don't think FLIP implies any >> of this. The FLIP suggests to send along with the CheckpointBarrier a >> flag "force full checkpoint". Then the state backend should respect it >> and should not use any of the previous shared handles. Now let me >> explain how that would work for RocksDB incremental checkpoints. >> >> 1. Simplest approach: upload all local RocksDB files. This works >> exactly the same as the first incremental checkpoint for a fresh start. >> 2. Improvement on 1) we already do know which files were uploaded for >> the initial checkpoint. Therefore instead of uploading the local >> files that are same with files uploaded for the initial checkpoint >> we call duplicate for those files and upload just the diff. >> >> It does not require any changes to the SharedStateRegistry nor to state >> handles, at least for RocksDB. >> >> Best, >> >> Dawid >> >> >> On 22/11/2021 19:33, Roman Khachatryan wrote: >> >> If you assume the 1st checkpoint needs to be "full" you know you are not >> allowed to use any shared files. >> It's true you should know about the shared files of the previous checkpoint, >> but e.g. RocksDB already tracks that. >> >> I mean that the design described by FLIP implies the following (PCIIW): >> 1. treat SST files from the initial checkpoint specially: re-upload or >> send placeholder - depending on those attributes in state handle >> 2. (SST files from newer checkpoints are re-uploaded depending on >> confirmation currently; so yes there is tracking, but it's different) >> 3. SharedStateRegistry must allow replacing state under the existing >> key; otherwise, if a new key is used then other parallel subtasks >> should learn somehow this key and use it; However, allowing >> replacement must be limited to this scenario, otherwise it can lead to >> previous checkpoint corruption in normal cases >> >> Forcing a full checkpoint after completing N checkpoints instead of >> immediately would only require enabling (1) after N checkpoints. >> And with the "poll API until checkpoint released" approach, those >> changes aren't necessary. >> >> >> There is one more fundamental issue with either of your two proposals >> that've just came to my mind. >> What happens if you have externalized checkpoints and the job fails before >> the initial checkpoint can be safely removed? >> >> You start the job from the latest created checkpoint and wait for it >> to be allowed for deletion. Then you can delete it, and all previous >> checkpoints (or am I missing something?) >> >> >> With tracking the shared files on JM you can not say if you can clear the >> files after couple of checkpoints or 10s, 100s or 1000s, >> which translates into minutes/hours/days/weeks of processing. >> >> This doesn't necessarily translate into higher cost (because of saved >> RPC etc., as I mentioned above). >> However, I do agree that an infinite or arbitrary high delay is unacceptable. >> >> The added complexity above doesn't seem negligible to me (especially >> in SharedStateHandle); and should therefore be weighted against those >> operational disadvantages (given that the number of checkpoints to >> wait is bounded in practice). >> >> Regards, >> Roman >> >> >> >> >> On Mon, Nov 22, 2021 at 5:05 PM Dawid Wysakowicz <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> wrote: >> >> There is one more fundamental issue with either of your two proposals >> that've just came to my mind. What happens if you have externalized >> checkpoints and the job fails before the initial checkpoint can be >> safely removed? You have a situation where you have a retained >> checkpoint that was built on top of the original one. Basically ending >> in a situation we have right now that you never know when it is safe to >> delete a retained checkpoint. >> >> BTW, the intention for the "claim" mode was to support cases when users >> are concerned with the performance of the first checkpoint. In those >> cases they can claim the checkpoint on don't pay the additional cost of >> the first checkpoint. >> >> Best, >> >> Dawid >> >> On 22/11/2021 14:09, Roman Khachatryan wrote: >> >> Thanks Dawid, >> >> Regarding clarity, >> I think that all proposals require waiting for some event: re-upload / >> checkpoint completion / api response. >> But with the current one, there is an assumption: "initial checkpoint >> can be deleted once a new one completes" (instead of just "initial >> checkpoint can be deleted once the API says it can be deleted"). >> So I think it's actually more clear to offer this explicit API and rely on >> it. >> >> Regarding delaying the deletion, >> I agree that it can delay deletion, but how important is it? >> Checkpoints are usually stored on relatively cheap storage like S3, so >> some delay shouldn't be an issue (especially taking rounding into >> account); it can even be cheaper or comparable to paying for >> re-upload/duplicate calls. >> >> Infinite delay can be an issue though, I agree. >> Maybe @Yun can clarify the likelihood of never deleting some SST files >> by RocksDB? >> For the changelog backend, old files won't be used once >> materialization succeeds. >> >> Yes, my concern is checkpointing time, but also added complexity: >> >> It would be a bit invasive though, as we would have to somehow keep track >> which files should not be reused on TMs. >> >> I think we need this anyway if we choose to re-upload files once the >> job is running. >> The new checkpoint must be formed by re-uploaded old artifacts AND >> uploaded new artifacts. >> >> >> Regards, >> Roman >> >> >> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz<dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> wrote: >> >> @Yun >> >> I think it is a good comment with I agree in principal. However, we use >> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings >> for both restoring from a savepoint and an externalized checkpoint already. >> I wanted to voice that concern. Nevertheless I am fine with changing it to >> execution.restore-mode, if there are no other comments on that matter, I >> will change it. >> >> @Roman: >> >> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that >> to the doc. >> >> Re.2 What I don't like about this counter proposal is that it still has no >> clearly defined point in time when it is safe to delete the original >> checkpoint. Users would have a hard time reasoning about it and debugging. >> Even worse, I think worst case it might never happen that all the original >> files are no longer in use (I am not too familiar with RocksDB compaction, >> but what happens if there are key ranges that are never accessed again?) I >> agree it is unlikely, but possible, isn't it? Definitely it can take a >> significant time and many checkpoints to do so. >> >> Re. 3 I believe where you are coming from is that you'd like to keep the >> checkpointing time minimal and reuploading files may increase it. The >> proposal so far builds on the assumption we could in most cases use a cheap >> duplicate API instead of re-upload. I could see this as a follow-up if it >> becomes a bottleneck. It would be a bit invasive though, as we would have to >> somehow keep track which files should not be reused on TMs. >> >> Re. 2 & 3 Neither of the counter proposals work well for taking incremental >> savepoints. We were thinking of building incremental savepoints on the same >> concept. I think delaying the completion of an independent savepoint to a >> closer undefined future is not a nice property of savepoints. >> >> Re 4. Good point. We should make sure the first completed checkpoint has the >> independent/full checkpoint property rather than just the first triggered. >> >> Re. 5 & 6 I need a bit more time to look into it. >> >> Best, >> >> Dawid >> >> On 22/11/2021 11:40, Roman Khachatryan wrote: >> >> Hi, >> >> Thanks for the proposal Dawid, I have some questions and remarks: >> >> 1. How will stop-with-savepoint be handled? >> Shouldn't side effects be enforced in this case? (i.e. send >> notifyCheckpointComplete) >> >> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode? >> Anyways, any external tool will have to poll Flink API waiting for the >> next (full) checkpoint, before deleting the retained checkpoint, >> right? >> Instead, we can provide an API which tells whether the 1st checkpoint >> is still in use (and not force re-upload it). >> >> Under the hood, it can work like this: >> - for the checkpoint Flink recovers from, remember all shared state >> handles it is adding >> - when unregistering shared state handles, remove them from the set above >> - when the set becomes empty the 1st checkpoint can be deleted externally >> >> Besides not requiring re-upload, it seems much simpler and less invasive. >> On the downside, state deletion can be delayed; but I think this is a >> reasonable trade-off. >> >> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but >> after a configured number of checkpoints? >> There is a high chance that after some more checkpoints, initial state >> will not be used (because of compaction), >> so backends won't have to re-upload anything (or small part). >> >> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion >> This should be addressed in >> https://issues.apache.org/jira/browse/FLINK-24611. >> If not, I think the FLIP should consider this case. >> >> 5. Enforcing re-upload by a single task and Changelog state backend >> With Changelog state backend, a file can be shared by multiple operators. >> Therefore, getIntersection() is irrelevant here, because operators >> might not be sharing any key groups. >> (so we'll have to analyze "raw" file usage I think). >> >> 6. Enforcing re-upload by a single task and skew >> If we use some greedy logic like subtask 0 always re-uploads then it >> might be overloaded. >> So we'll have to obtain a full list of subtasks first (then probably >> choose randomly or round-robin). >> However, that requires rebuilding Task snapshot, which is doable but >> not trivial (which I think supports "reverse API option"). >> >> 7. I think it would be helpful to list file systems / object stores >> that support "fast" copy (ideally with latency numbers). >> >> Regards, >> Roman >> >> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao <yungao...@aliyun.com.invalid> >> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> >> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> >> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> >> <yungao...@aliyun.com.invalid> wrote: >> >> Hi, >> >> Very thanks Dawid for proposing the FLIP to clarify the ownership for the >> states. +1 for the overall changes since it makes the behavior clear and >> provide users a determined method to finally cleanup savepoints / retained >> checkpoints. >> >> Regarding the changes to the public interface, it seems currently the >> changes are all bound >> to the savepoint, but from the FLIP it seems perhaps we might also need to >> support the claim declaration >> for retained checkpoints like in the cli side[1] ? If so, then might it be >> better to change the option name >> from `execution.savepoint.restore-mode` to something like >> `execution.restore-mode`? >> >> Best, >> Yun >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint >> >> >> ------------------------------------------------------------------ >> From:Konstantin Knauf <kna...@apache.org> <kna...@apache.org> >> <kna...@apache.org> <kna...@apache.org> <kna...@apache.org> >> <kna...@apache.org> <kna...@apache.org> <kna...@apache.org> >> Send Time:2021 Nov. 19 (Fri.) 16:00 >> To:dev <dev@flink.apache.org> <dev@flink.apache.org> <dev@flink.apache.org> >> <dev@flink.apache.org> <dev@flink.apache.org> <dev@flink.apache.org> >> <dev@flink.apache.org> <dev@flink.apache.org> >> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership >> >> Hi Dawid, >> >> Thanks for working on this FLIP. Clarifying the differences and >> guarantees around savepoints and checkpoints will make it easier and safer >> for users and downstream projects and platforms to work with them. >> >> +1 to the changing the current (undefined) behavior when recovering from >> retained checkpoints. Users can now choose between claiming and not >> claiming, which I think will make the current mixed behavior obsolete. >> >> Cheers, >> >> Konstantin >> >> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> >> <dwysakow...@apache.org> >> wrote: >> >> Hi devs, >> >> I'd like to bring up for a discussion a proposal to clean up ownership >> of snapshots, both checkpoints and savepoints. >> >> The goal here is to make it clear who is responsible for deleting >> checkpoints/savepoints files and when can that be done in a safe manner. >> >> Looking forward for your feedback! >> >> Best, >> >> Dawid >> >> [1] https://cwiki.apache.org/confluence/x/bIyqCw >> >> >> >> -- >> >> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk >> >> -- >> >> Konstantin Knauf >> https://twitter.com/snntrable >> https://github.com/knaufk >> >>
OpenPGP_signature
Description: OpenPGP digital signature