If they'd like to use the --no-claim mode that would be the way to go, yes.

Two points to be on the same page here:

  * all Flink native state backends (RocksDB, HashMap, changelog) would
    already support --no-claim
  * if in the end we add the --legacy mode, users can also use that mode
    instead of --claim.

Best,

Dawid

On 26/11/2021 15:57, Till Rohrmann wrote:
> Thanks for writing this FLIP Dawid. Just to clarify one thing for the
> support of forced full snapshots. If a state backend does not support this
> feature, then the user either has to copy the snapshot manually or resume
> using --claim mode, create a savepoint in canonical format and then
> change the state backend if he wants to use --no-claim, right?
>
> Cheers,
> Till
>
> On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>>
>> I don't think this is a good idea. The modes apply to both savepoints and
>> checkpoints, plus it's much longer to type in. (minor)
>>
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>>
>> We had an offline discussion about it and so far we were leaning towards
>> keeping the set of supported options minimal. However, if we really think
>> the old behaviour is useful we can add a --legacy restore mode. cc
>> @Konstantin @Piotr
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> Sorry, I forgot, about this one. I added a note that savepoints generated
>> from stop-with-savepoint should commit side effects.
>>
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> I listed a couple of file systems that do have some sort of a COPY API.
>>
>> Best,
>>
>> Dawid
>> On 26/11/2021 11:03, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for updating the FLIP Dawid
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> A few minor things:
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf <kna...@apache.org> 
>> <kna...@apache.org> wrote:
>>
>> Hi Dawid,
>>
>> sounds good, specifically 2., too.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz <dwysakow...@apache.org> 
>> <dwysakow...@apache.org>
>> wrote:
>>
>>
>> Hi all,
>>
>> I updated the FLIP with a few clarifications:
>>
>>    1. I added a description how would we trigger a "full snapshot" in the
>>    changelog state backend
>>       - (We would go for this option in the 1st version). Trigger a
>>       snapshot of the base state backend in the 1st checkpoint, which induces
>>       materializing the changelog. In this approach we could duplicate SST 
>> files,
>>       but we would not duplicate the diff files.
>>    - Add a hook for logic for computing which task should duplicate the
>>       diff files. We would have to do a pass over all states after the state
>>       assignment in StateAssignmentOperation
>>       2. I clarified that the "no-claim" mode requires a
>>    completed/successful checkpoint before we can remove the one we are
>>    restoring from. Also added a note that we can assume a checkpoint is
>>    completed if it is confirmed by Flink's API for checkpointing stats or by
>>    checking an entry in HA services. A checkpoint can not be assumed 
>> completed
>>    by just looking at the checkpoint files.
>>
>> I suggest going on with the proposal for "no-claim" as suggested so far,
>> as it is easier to understand by users. They can reliably tell when they
>> can expect the checkpoint to be deletable. If we see that the time to take
>> the 1st checkpoint becomes a problem we can extend the set of restore
>> methods and e.g. add a "claim-temporarily" method.
>>
>> I hope we can reach a consensus and start a vote, some time early next
>> week.
>>
>> Best,
>>
>> Dawid
>>
>> On 23/11/2021 22:39, Roman Khachatryan wrote:
>>
>> I also referred to the "no-claim" mode and I still think neither of them 
>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>> to be able delete any checkpoint.
>>
>> I think the lineage is needed in all approaches with arbitrary
>> histories; the difference is whether a running Flink is required or
>> not. Is that what you mean?
>> (If not, could you please explain how the scenario you mentioned above
>> with multiple jobs branching from the same checkpoint is handled?)
>>
>>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + 
>> SST file name, so the key would be different (the key group range is 
>> different for two tasks) and we would've two separate counters for the same 
>> file.
>>
>> You're right. But there is also a collision between old and new entries.
>>
>>
>> To be on the same page here. It is not a problem so far in RocksDB, because 
>> we do not reuse any shared files in case of rescaling.
>>
>> As I mentioned above, collision happens not only because of rescaling;
>> and AFAIK, there are some ideas to reuse files on rescaling (probably
>> Yuan could clarify). Anyways, I think it makes sense to not bake in
>> this assumption unless it's hard to implement (or at least state it
>> explicitly in FLIP).
>>
>>
>> It is not suggested as an optimization. It is suggested as a must for state 
>> backends that need it. I did not elaborate on it, because it could affected 
>> only the changelog state backend at the moment, which I don't have much 
>> insights. I agree it might make sense to look a bit how we could force full 
>> snapshots in the changelog state backend. I will spend some extra time on 
>> that.
>>
>> I see. For the Changelog state backend, the easiest way would be to
>> obtain a full snapshot from the underlying backend in snapshot(),
>> ignoring all non-materialized changes. This will effectively
>> materialize all the changes, so only new non-materialized state will
>> be used in subsequent checkpoints.
>>
>>
>> Only the task that gets assigned [1,16] would be responsible for duplicating 
>> files of the old range [1, 64].
>>
>> Wouldn't it be likely that the same TM will be responsible for [1, 64]
>> "windowState", [1, 64] "timerState", and so on, for all operators in
>> the chain, and probably other chains? (that what I mean by skew)
>> If we want to address this, preserving handle immutability then we'll
>> probably have to rebuild the whole task state snapshot.
>> (depending on how we approach RocksDB re-uploading, it might not be
>> relevant though)
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> wrote:
>>
>> I think I know where the confusion comes from regarding arbitrarily
>> recovery histories: Both my counter-proposals were for "no-claim"
>> mode; I didn't mean to replace "claim" mode with them.
>> However, as Yun pointed out, it's impossible to guarantee that all the
>> files will be compacted in a finite number of checkpoints; so let's
>> withdraw those proposals.
>>
>> I also referred to the "no-claim" mode and I still think neither of them 
>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>> to be able delete any checkpoint.
>>
>> Let's consider a job running with DoP=1; it created checkpoint C1 with
>> a single file F1 and then stopped.
>> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
>> will receive the same file F1.
>>
>> To be on the same page here. It is not a problem so far in RocksDB, because 
>> we do not reuse any shared files in case of rescaling. If we want to change 
>> how rescaling in RocksDB works then yes, we would have to consider how we 
>> want to make sure we copy/duplicate just once. However we would have to 
>> first change a crucial thing about regular incremental checkpoints and 
>> reorganize the SharedStateRegistry along the way.
>>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + 
>> SST file name, so the key would be different (the key group range is 
>> different for two tasks) and we would've two separate counters for the same 
>> file.
>>
>> Of course, correct me if I am wrong in the two paragraphs above.
>>
>> Re-upload from one task (proposed in FLIP as optimization)
>>
>> It is not suggested as an optimization. It is suggested as a must for state 
>> backends that need it. I did not elaborate on it, because it could affected 
>> only the changelog state backend at the moment, which I don't have much 
>> insights. I agree it might make sense to look a bit how we could force full 
>> snapshots in the changelog state backend. I will spend some extra time on 
>> that.
>>
>> Lastly I might be wrong, but I think the KeyedStateHandle#getIntersection is 
>> a good candidate to distribute the task of duplicating shared files pretty 
>> evenly. The idea was that we could mark specially the handles that are 
>> assigned the "start of the old key group range". Therefore if a file 
>> belonged to a handle responsible for a key group range: [1,64], which is 
>> later on split into [1, 16], [17, 32], [33, 48]. [49, 64]. Only the task 
>> that gets assigned [1,16] would be responsible for duplicating files of the 
>> old range [1, 64].
>>
>> Best,
>>
>> Dawid
>>
>> On 23/11/2021 14:27, Khachatryan Roman wrote:
>>
>> Thanks Dawid, Yun and Piotr,
>>
>> I think I know where the confusion comes from regarding arbitrarily
>> recovery histories: Both my counter-proposals were for "no-claim"
>> mode; I didn't mean to replace "claim" mode with them.
>> However, as Yun pointed out, it's impossible to guarantee that all the
>> files will be compacted in a finite number of checkpoints; so let's
>> withdraw those proposals.
>>
>> And as there are no other alternatives left, the changes to
>> SharedStateRegistry or State Backends are not a decisive factor
>> anymore.
>>
>> However, it probably still makes sense to clarify the details of how
>> re-upload will work in case of rescaling.
>>
>> Let's consider a job running with DoP=1; it created checkpoint C1 with
>> a single file F1 and then stopped.
>> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
>> will receive the same file F1.
>>
>> Let's say both tasks will re-use F1, so it needs to be re-uploaded.
>> Now, we have a choice:
>> 1. Re-upload from both tasks
>> For RocksDB, the state key is: backend UID + SST file name. Both are
>> the same for two tasks, so the key will be the same.
>> Currently, SharedStateRegistry will reject both as duplicates.
>>
>> We can't just replace (to not lose one of the files), so we have to
>> use random keys.
>> However, when we further downscale:
>> - we'll have a conflict on recovery (multiple SST files with the same name)
>> - we'll re-upload the same file multiple times unnecessarily
>> So we have to de-duplicate state on recovery - ideally before sending
>> state snapshots to tasks.
>>
>> 2. Re-upload from one task (proposed in FLIP as optimization)
>> Both tasks must learn the new key. Otherwise, the snapshot of the
>> not-reuploading task will refer to a non-existing entry.
>> We can either re-use the old key (and allow replacement in
>> SharedStateRegistry); or generate the key on JM before sending task
>> state snapshots.
>>
>>
>> P.S.:
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> This is effectively what we have right now, but with an extra (Async?)
>>
>> Right now, there is absolutely no way to find out when the shared
>> state can be deleted; it can't be inferred from which checkpoints are
>> subsumed, and which are not, as future checkpoints might still be
>> using that state.
>>
>> Regards,
>> Roman
>>
>>
>>
>> On Tue, Nov 23, 2021 at 1:37 PM Piotr Nowojski <pnowoj...@apache.org> 
>> <pnowoj...@apache.org> <pnowoj...@apache.org> <pnowoj...@apache.org> wrote:
>>
>> Hi,
>>
>> I'm not entirely sure if I fully understand the raised concerns here. So
>> let me maybe step back in the discussion a bit and address the original
>> points from Roman.
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim
>>
>> mode?
>>
>> I second the concerns from Dawid. This is effectively what we have right
>> now, but with an extra (Async?) API call. It's not conceptually simple,
>> it's hard to explain to the users, it might take actually forever to
>> release the artefacts. Furthermore I don't think the implementation would
>> be trivial.
>>
>> On the other hand the current proposal of having (a) `--claim` and (b)
>> `--no-claim` mode are conceptually very simple. (a) being perfectly
>> efficient, without any overheads. If you have concerns that (b) will cause
>> some overheads, slower first checkpoint etc, keep in mind that the user can
>> always pick option (a). Starting a new job from an existing
>> savepoint/externalised checkpoint in general shouldn't be time critical, so
>> users can always even manually copy the files and still use option (a), or
>> just be fine accepting the price of a slower first checkpoint. For other
>> use cases - restarting the same job after a downtime - (b) sounds to me to
>> be an acceptable option.
>>
>> I would also like to point out that the "force full snapshot"/"do not use
>> previous artefacts" option we will need either way for the incremental
>> intermediate savepoints (subject of a next FLIP). From this perspective, we
>> are getting the "--no-claim" option basically for free.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but after
>>
>> a configured number of checkpoints?
>>
>> I don't see a reason why we couldn't provide an option like that at some
>> point in the future. However as it's more complicated to reason about, more
>> complicated to implement and I'm not entirely sure how much actually needed
>> given the (a) `--claim` mode, I think we can wait for feedback from the
>> users before actually implementing it.
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> What do you mean by "rebuilding Task snapshot"?
>>
>> During some early discussions about this point, I've hoped that a state
>> backend like changelog could embed into the state handle information which
>> operator should actually be responsible for duplicating such shared states.
>> However now that I'm thinking about it, indeed there might be an issue if
>> we combine the fact that state handles can be shared across multiple
>> different operators and with a job modification, like dropping an operator.
>> In that case it looks like we would need some extra logic during recovery,
>> that would have an overview of the whole job to make a decision which
>> particular parallel instance of an operator should be responsible for
>> duplicating the underlying file?
>>
>> Best,
>> Piotrek
>>
>> wt., 23 lis 2021 o 12:28 Dawid Wysakowicz <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org>
>> napisaƂ(a):
>>
>> Some user has usage to restore different jobs based on a same externalized
>> checkpoint. I think this usage would break after introducing this FLIP, and
>> we must tell users explicitly if choose to make Flink manage the
>> checkpoints by default.
>>
>> Could you elaborate what do you mean? The proposal is to use the
>> "no-claim" mode by default which should let users start as many jobs as
>> they wish from the same externalized checkpoints and it should not cause
>> them any harm. Each job effectively will create effectively it's own
>> private "copy" of the initial checkpoint along with the 1st taken
>> checkpoint.
>>
>> If the 1st full checkpoint did not complete in the end, the next
>> checkpoints have to try to reupload all artifacts again. I think this
>> problem could be mitigated if task knows some files have been uploaded
>> before.
>>
>> I don't know how we could achieve that easily. Besides, we have the same
>> situation for all, even regular checkpoints don't we? Do we check if e.g.
>> diff files has been successfully uploaded in a previous aborted checkpoint?
>> I am not saying it's a wrong suggestion, just that I feel it is orthogonal
>> and I can't see a straightforward way to implement it.
>>
>> Best,
>>
>> Dawid
>> On 23/11/2021 07:52, Yun Tang wrote:
>>
>> Hi,
>>
>> For the likelihood of never deleting some SST files by RocksDB. 
>> Unfortunately, it could happen as current level compaction strategy in 
>> RocksDB is triggered by upper input level size reached to the threshold and 
>> the compaction priority cannot guarantee all files would be choosed during 
>> several round compactions.
>>
>> Actually, I am a bit in favor of this FLIP to manage checkpoints within 
>> Flink as we have heared from many users that they cannot delete older 
>> checkpoints after several rounds of re-launching Flink jobs. Current Flink 
>> would not delete older checkpoints automatically when restoring from older 
>> retained checkpoint, which makes the base checkpoint directory becomes 
>> larger and larger. However, if they decide to delete the older checkpoint 
>> directory of other job-ids, they might not be able to recover from the last 
>> completed checkpoint as it might depend on some artifacts in older 
>> checkpoint directory.
>>
>> And I think reuploading would indeed increase the 1st checkpoint duration 
>> after restoring. For aliyun oss, the developer said that copping files 
>> (larger than 32MB) from one location to another within same bucket on DFS 
>> could cause hundreds millseconds. However, from my experiences, copying on 
>> HDFS might not be so quick. Maybe some numbers here could be better.
>>
>> I have two questions here:
>> 1. If the 1st full checkpoint did not complete in the end, the next 
>> checkpoints have to try to reupload all artifacts again. I think this 
>> problem could be mitigated if task knows some files have been uploaded 
>> before.
>> 2. Some user has usage to restore different jobs based on a same 
>> externalized checkpoint. I think this usage would break after introducing 
>> this FLIP, and we must tell users explicitly if choose to make Flink manage 
>> the checkpoints by default.
>>
>> Best
>> Yun Tang
>>
>>
>> On 2021/11/22 19:49:11 Dawid Wysakowicz wrote:
>>
>>     There is one more fundamental issue with either of your two
>>     proposals that've just came to my mind.
>>     What happens if you have externalized checkpoints and the job fails
>>     before the initial checkpoint can be safely removed?
>>
>>     You start the job from the latest created checkpoint and wait for it
>>     to be allowed for deletion. Then you can delete it, and all previous
>>     checkpoints (or am I missing something?)
>>
>>
>> Let me clarify it with an example. You start with chk-42, Flink takes
>> e.g. three checkpoints chk-43, chk-44, chk-45 all still reference chk-42
>> files. After that it fails. We have externalized checkpoints enabled,
>> therefore we have retained all checkpoints. Users starts a new program
>> from let's say chk-45. At this point your proposal does not give the
>> user any help in regards when chk-42 can be safely removed. (This is
>> also how Flink works right now).
>>
>> To make it even harder you can arbitrarily complicate it, 1) start a job
>> from chk-44, 2) start a job from a chk-47 which depends on chk-45, 3)
>> never start a job from chk-44, it is not claimed by any job, thus it is
>> never deleted, users must remember themselves that chk-44 originated
>> from chk-42 etc.) User would be forced to build a lineage system for
>> checkpoints to track which checkpoints depend on each other.
>>
>>     I mean that the design described by FLIP implies the following (PCIIW):
>>     1. treat SST files from the initial checkpoint specially: re-upload or
>>     send placeholder - depending on those attributes in state handle
>>     2. (SST files from newer checkpoints are re-uploaded depending on
>>     confirmation currently; so yes there is tracking, but it's different)
>>     3. SharedStateRegistry must allow replacing state under the existing
>>     key; otherwise, if a new key is used then other parallel subtasks
>>     should learn somehow this key and use it; However, allowing
>>     replacement must be limited to this scenario, otherwise it can lead to
>>     previous checkpoint corruption in normal cases
>>
>> I might not understand your points, but I don't think FLIP implies any
>> of this. The FLIP suggests to send along with the CheckpointBarrier a
>> flag "force full checkpoint". Then the state backend should respect it
>> and should not use any of the previous shared handles. Now let me
>> explain how that would work for RocksDB incremental checkpoints.
>>
>>  1. Simplest approach: upload all local RocksDB files. This works
>>     exactly the same as the first incremental checkpoint for a fresh start.
>>  2. Improvement on 1) we already do know which files were uploaded for
>>     the initial checkpoint. Therefore instead of uploading the local
>>     files that are same with files uploaded for the initial checkpoint
>>     we call duplicate for those files and upload just the diff.
>>
>> It does not require any changes to the SharedStateRegistry nor to state
>> handles, at least for RocksDB.
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 22/11/2021 19:33, Roman Khachatryan wrote:
>>
>> If you assume the 1st checkpoint needs to be "full" you know you are not 
>> allowed to use any shared files.
>> It's true you should know about the shared files of the previous checkpoint, 
>> but e.g. RocksDB already tracks that.
>>
>> I mean that the design described by FLIP implies the following (PCIIW):
>> 1. treat SST files from the initial checkpoint specially: re-upload or
>> send placeholder - depending on those attributes in state handle
>> 2. (SST files from newer checkpoints are re-uploaded depending on
>> confirmation currently; so yes there is tracking, but it's different)
>> 3. SharedStateRegistry must allow replacing state under the existing
>> key; otherwise, if a new key is used then other parallel subtasks
>> should learn somehow this key and use it; However, allowing
>> replacement must be limited to this scenario, otherwise it can lead to
>> previous checkpoint corruption in normal cases
>>
>> Forcing a full checkpoint after completing N checkpoints instead of
>> immediately would only require enabling (1) after N checkpoints.
>> And with the "poll API until checkpoint released" approach, those
>> changes aren't necessary.
>>
>>
>> There is one more fundamental issue with either of your two proposals 
>> that've just came to my mind.
>> What happens if you have externalized checkpoints and the job fails before 
>> the initial checkpoint can be safely removed?
>>
>> You start the job from the latest created checkpoint and wait for it
>> to be allowed for deletion. Then you can delete it, and all previous
>> checkpoints (or am I missing something?)
>>
>>
>> With tracking the shared files on JM you can not say if you can clear the 
>> files after couple of checkpoints or 10s, 100s or 1000s,
>> which translates into minutes/hours/days/weeks of processing.
>>
>> This doesn't necessarily translate into higher cost (because of saved
>> RPC etc., as I mentioned above).
>> However, I do agree that an infinite or arbitrary high delay is unacceptable.
>>
>> The added complexity above doesn't seem negligible to me (especially
>> in SharedStateHandle); and should therefore be weighted against those
>> operational disadvantages (given that the number of checkpoints to
>> wait is bounded in practice).
>>
>> Regards,
>> Roman
>>
>>
>>
>>
>> On Mon, Nov 22, 2021 at 5:05 PM Dawid Wysakowicz <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> wrote:
>>
>> There is one more fundamental issue with either of your two proposals
>> that've just came to my mind. What happens if you have externalized
>> checkpoints and the job fails before the initial checkpoint can be
>> safely removed? You have a situation where you have a retained
>> checkpoint that was built on top of the original one. Basically ending
>> in a situation we have right now that you never know when it is safe to
>> delete a retained checkpoint.
>>
>> BTW, the intention for the "claim" mode was to support cases when users
>> are concerned with the performance of the first checkpoint. In those
>> cases they can claim the checkpoint on don't pay the additional cost of
>> the first checkpoint.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 14:09, Roman Khachatryan wrote:
>>
>> Thanks Dawid,
>>
>> Regarding clarity,
>> I think that all proposals require waiting for some event: re-upload /
>> checkpoint completion / api response.
>> But with the current one, there is an assumption: "initial checkpoint
>> can be deleted once a new one completes" (instead of just "initial
>> checkpoint can be deleted once the API says it can be deleted").
>> So I think it's actually more clear to offer this explicit API and rely on 
>> it.
>>
>> Regarding delaying the deletion,
>> I agree that it can delay deletion, but how important is it?
>> Checkpoints are usually stored on relatively cheap storage like S3, so
>> some delay shouldn't be an issue (especially taking rounding into
>> account); it can even be cheaper or comparable to paying for
>> re-upload/duplicate calls.
>>
>> Infinite delay can be an issue though, I agree.
>> Maybe @Yun can clarify the likelihood of never deleting some SST files
>> by RocksDB?
>> For the changelog backend, old files won't be used once
>> materialization succeeds.
>>
>> Yes, my concern is checkpointing time, but also added complexity:
>>
>> It would be a bit invasive though, as we would have to somehow keep track 
>> which files should not be reused on TMs.
>>
>> I think we need this anyway if we choose to re-upload files once the
>> job is running.
>> The new checkpoint must be formed by re-uploaded old artifacts AND
>> uploaded new artifacts.
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz<dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> wrote:
>>
>> @Yun
>>
>> I think it is a good comment with I agree in principal. However, we use 
>> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings 
>> for both restoring from a savepoint and an externalized checkpoint already. 
>> I wanted to voice that concern. Nevertheless I am fine with changing it to 
>> execution.restore-mode, if there are no other comments on that matter, I 
>> will change it.
>>
>> @Roman:
>>
>> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that 
>> to the doc.
>>
>> Re.2 What I don't like about this counter proposal is that it still has no 
>> clearly defined point in time when it is safe to delete the original 
>> checkpoint. Users would have a hard time reasoning about it and debugging. 
>> Even worse, I think worst case it might never happen that all the original 
>> files are no longer in use (I am not too familiar with RocksDB compaction, 
>> but what happens if there are key ranges that are never accessed again?) I 
>> agree it is unlikely, but possible, isn't it? Definitely it can take a 
>> significant time and many checkpoints to do so.
>>
>> Re. 3 I believe where you are coming from is that you'd like to keep the 
>> checkpointing time minimal and reuploading files may increase it. The 
>> proposal so far builds on the assumption we could in most cases use a cheap 
>> duplicate API instead of re-upload. I could see this as a follow-up if it 
>> becomes a bottleneck. It would be a bit invasive though, as we would have to 
>> somehow keep track which files should not be reused on TMs.
>>
>> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
>> savepoints. We were thinking of building incremental savepoints on the same 
>> concept. I think delaying the completion of an independent savepoint to a 
>> closer undefined future is not a nice property of savepoints.
>>
>> Re 4. Good point. We should make sure the first completed checkpoint has the 
>> independent/full checkpoint property rather than just the first triggered.
>>
>> Re. 5 & 6 I need a bit more time to look into it.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for the proposal Dawid, I have some questions and remarks:
>>
>> 1. How will stop-with-savepoint be handled?
>> Shouldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared state handles, remove them from the set above
>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>
>> Besides not requiring re-upload, it seems much simpler and less invasive.
>> On the downside, state deletion can be delayed; but I think this is a
>> reasonable trade-off.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>> after a configured number of checkpoints?
>> There is a high chance that after some more checkpoints, initial state
>> will not be used (because of compaction),
>> so backends won't have to re-upload anything (or small part).
>>
>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>> This should be addressed in 
>> https://issues.apache.org/jira/browse/FLINK-24611.
>> If not, I think the FLIP should consider this case.
>>
>> 5. Enforcing re-upload by a single task and Changelog state backend
>> With Changelog state backend, a file can be shared by multiple operators.
>> Therefore, getIntersection() is irrelevant here, because operators
>> might not be sharing any key groups.
>> (so we'll have to analyze "raw" file usage I think).
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> 7. I think it would be helpful to list file systems / object stores
>> that support "fast" copy (ideally with latency numbers).
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao <yungao...@aliyun.com.invalid> 
>> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> 
>> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> 
>> <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> 
>> <yungao...@aliyun.com.invalid> wrote:
>>
>> Hi,
>>
>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>> states. +1 for the overall changes since it makes the behavior clear and
>> provide users a determined method to finally cleanup savepoints / retained 
>> checkpoints.
>>
>> Regarding the changes to the public interface, it seems currently the 
>> changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>> support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>> better to change the option name
>> from `execution.savepoint.restore-mode` to something like 
>> `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> ------------------------------------------------------------------
>> From:Konstantin Knauf <kna...@apache.org> <kna...@apache.org> 
>> <kna...@apache.org> <kna...@apache.org> <kna...@apache.org> 
>> <kna...@apache.org> <kna...@apache.org> <kna...@apache.org>
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev <dev@flink.apache.org> <dev@flink.apache.org> <dev@flink.apache.org> 
>> <dev@flink.apache.org> <dev@flink.apache.org> <dev@flink.apache.org> 
>> <dev@flink.apache.org> <dev@flink.apache.org>
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org> <dwysakow...@apache.org> <dwysakow...@apache.org> 
>> <dwysakow...@apache.org>
>> wrote:
>>
>> Hi devs,
>>
>> I'd like to bring up for a discussion a proposal to clean up ownership
>> of snapshots, both checkpoints and savepoints.
>>
>> The goal here is to make it clear who is responsible for deleting
>> checkpoints/savepoints files and when can that be done in a safe manner.
>>
>> Looking forward for your feedback!
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>
>>
>>
>> --
>>
>> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>>
>>  --
>>
>> Konstantin Knauf
>> https://twitter.com/snntrable
>> https://github.com/knaufk
>>
>>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to