Hey!

Regarding the question of initialSavepointPath and
flinkStateSnapshotReference new object, I think we could simply keep an
extra field as part of the flinkStateSnapshotReference object called path.

Then the fields could be:
namespace, name, path

If path is defined we would use that (to support the simple way also)
otherwise use the resource. I would still deprecate the
initialSavepointPath field in the jobSpec.

Regarding the Savepoint/Checkpoint vs FlinkStateSnapshot.
What we need:
 1. Easy way to list all state snapshots (to select latest)
 2. Easy way to reference a savepoint/checkpoint from a jobspec
 3. Differentiate state snapshot types (in some cases users may prefer to
use checkpoint/savepoint for certain upgrades) -> we should add a label or
something for easy selection
 4. Be able to delete savepoints (and checkpoints maybe)

I am personally slightly more in favor of having a single resource as that
ticks all the boxes, while having 2 separate resources will make both
listing and referencing harder. We would have to introduce state type in
the reference (name + namespace would not be enough to uniquely identify a
state snapshot)

I wonder if I am missing any good argument against the single
FlinkStateSnapshot here.

Cheers,
Gyula


On Fri, Apr 19, 2024 at 9:09 PM Mate Czagany <czmat...@gmail.com> wrote:

> Hi Robert and Thomas,
>
> Thank you for sharing your thoughts, I will try to address your questions
> and suggestions:
>
> 1. I would really love to hear others' inputs as well about separating the
> snapshot CRD into two different CRDs instead for savepoints and
> checkpoints. I think the main upside is that we would not need the
> mandatory savepoint or checkpoint field inside the spec. The two CRs could
> share the same status fields, and their specs would be different.
> I personally like both solutions, and would love to hear others' thoughts
> as well.
>
> 2. I agree with you that "completed" is not very clear, but I would
> suggest the name "alreadyExists". WDYT?
>
> 3. I think having a retry loop inside the operator does not add too much
> complexity to the FLIP. On failure, we check if we have reached the max
> retries. If we did, the state will be set to "FAILED", else it will be set
> to "TRIGGER_PENDING", causing the operator to retry the task. The "error"
> field will always be populated with the latest error. Kubernetes Jobs
> already has a similar field called "backoffLimit", maybe we could use the
> same name, with the same logic applied, WDYT?
> About error events, I think we should keep the "error" field, and upon
> successful snapshot, we clear it. I will add to the FLIP that there will be
> an event generated for each unsuccessful snapshots.
>
> 4. I really like the idea of having something like Pod Conditions, but I
> think it wouldn't add too much value here, because the only 2 stages
> important to the user are "Triggered" and "Completed", and those timestamps
> will already be included in the status field. I think it would make more
> sense to implement this if there were more lifecycle stages.
>
> 5. There will be a new field in JobSpec called
> "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore
> from.
>
> > How do you see potential effects on API server performance wrt. number of
> objects vs mutations? Is the proposal more or less neutral in that regard?
>
> While I am not an expert in Kubernetes internals, my understanding is that
> for the api-server, editing an existing resource or creating a new one is
> not different performance-wise, because the whole resource will always be
> written to etcd anyways.
> Retrieving the savepoints from etcd will be different though for some
> use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment
> would require the api-server to retrieve every snapshots first in a
> namespace from etcd, then filter them for that specific FlinkDeployment. I
> think this is a worst-case scenario, and it will be up to the user to
> optimize their queries via e.g. watch queries [1] or resourceVersions [2].
>
> > Does that mean one would have to create a FlinkStateSnapshot CR when
> starting a new deployment from savepoint? If so, that's rather complicated.
> I would prefer something more simple/concise and would rather
> keep initialSavepointPath
>
> Starting a job from a savepoint path will indeed be deprecated with this
> FLIP. I agree that it will be more complicated to restore from a savepoint
> in those cases, but if the user decides to move away from the deprecated
> savepoint mechanisms, every savepoint will result in a new
> FlinkStateSnapshot CR. So the only situation I expect this to be an
> inconvenience is when the user onboards a new Flink job to the operator.
> But I may not be thinking this through, so please let me know if you
> disagree.
>
> Thank you very much for your questions and suggestions!
>
> [1]
> https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
> [2]
> https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
>
> Regards,
> Mate
>
> Thomas Weise <t...@apache.org> ezt írta (időpont: 2024. ápr. 19., P,
> 11:31):
>
>> Thanks for the proposal.
>>
>> How do you see potential effects on API server performance wrt. number of
>> objects vs mutations? Is the proposal more or less neutral in that regard?
>>
>> Thanks for the thorough feedback Robert.
>>
>> Couple more questions below.
>>
>> -->
>>
>> On Fri, Apr 19, 2024 at 5:07 AM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>> > Hi Mate,
>> > thanks for proposing this, I'm really excited about your FLIP. I hope my
>> > questions make sense to you:
>> >
>> > 1. I would like to discuss the "FlinkStateSnapshot" name and the fact
>> that
>> > users have to use either the savepoint or checkpoint spec inside the
>> > FlinkStateSnapshot.
>> > Wouldn't it be more intuitive to introduce two CRs:
>> > FlinkSavepoint and FlinkCheckpoint
>> > Ideally they can internally share a lot of code paths, but from a users
>> > perspective, the abstraction is much clearer.
>> >
>>
>> There are probably pros and cons either way. For example it is desirable
>> to
>> have a single list of state snapshots when looking for the initial
>> savepoint for a new deployment etc.
>>
>>
>> >
>> > 2. I also would like to discuss SavepointSpec.completed, as this name is
>> > not intuitive to me. How about "ignoreExisting"?
>> >
>> > 3. The FLIP proposal seems to leave error handling to the user, e.g.
>> when
>> > you create a FlinkStateSnapshot, it will just move to status FAILED.
>> > Typically in K8s with the control loop stuff, resources are tried to get
>> > created until success. I think it would be really nice if the
>> > FlinkStateSnapshot or FlinkSavepoint resource would retry based on a
>> > property in the resource. A "FlinkStateSnapshot.retries" number would
>> > indicate how often the user wants the operator to retry creating a
>> > savepoint, "retries = -1" means retry forever. In addition, we could
>> > consider a timeout as well, however, I haven't seen such a concept in
>> K8s
>> > CRs yet.
>> > The benefit of this is that other tools relying on the K8s operator
>> > wouldn't have to implement this retry loop (which is quite natural for
>> > K8s), they would just have to wait for the CR they've created to
>> transition
>> > into COMPLETED:
>> >
>> > 3. FlinkStateSnapshotStatus.error will only show the last error. What
>> > about using Events, so that we can show multiple errors and use the
>> > FlinkStateSnapshotState to report errors?
>> >
>> > 4. I wonder if it makes sense to use something like Pod Conditions (e.g.
>> > Savepoint Conditions):
>> >
>> https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions
>> > to track the completion status. We could have the following conditions:
>> > - Triggered
>> > - Completed
>> > - Failed
>> > The only benefit of this proposal that I see is that it would tell the
>> > user how long it took to create the savepoint.
>> >
>> > 5. You mention that "JobSpec.initialSavepointPath" will be deprecated. I
>> > assume we will introduce a new field for referencing a
>> FlinkStateSnapshot
>> > CR? I think it would be good to cover this in the FLIP.
>> >
>> > Does that mean one would have to create a FlinkStateSnapshot CR when
>> starting a new deployment from savepoint? If so, that's rather
>> complicated.
>> I would prefer something more simple/concise and would rather
>> keep initialSavepointPath
>>
>>
>> >
>> > One minor comment:
>> >
>> > "/** Dispose the savepoints upon CRD deletion. */"
>> >
>> > I think this should be "upon CR deletion", not "CRD deletion".
>> >
>> > Thanks again for this great FLIP!
>> >
>> > Best,
>> > Robert
>> >
>> >
>> > On Fri, Apr 19, 2024 at 9:01 AM Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> >
>> >> Cc'ing some folks who gave positive feedback on this idea in the past.
>> >>
>> >> I would love to hear your thoughts on the proposed design
>> >>
>> >> Gyula
>> >>
>> >> On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás <matyas.orh...@gmail.com
>> >
>> >> wrote:
>> >>
>> >>> +1 Looking forward to it
>> >>>
>> >>> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany <czmat...@gmail.com>
>> wrote:
>> >>>
>> >>> > Thank you Gyula!
>> >>> >
>> >>> > I think that is a great idea. I have updated the Google doc to only
>> >>> have 1
>> >>> > new configuration option of boolean type, which can be used to
>> signal
>> >>> the
>> >>> > Operator to use the old mode.
>> >>> >
>> >>> > Also described in the configuration description, the Operator will
>> >>> fallback
>> >>> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the
>> >>> > Kubernetes cluster.
>> >>> >
>> >>> > Regards,
>> >>> > Mate
>> >>> >
>> >>> > Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont: 2024. ápr.
>> 16.,
>> >>> K,
>> >>> > 17:01):
>> >>> >
>> >>> > > Thanks Mate, this is great stuff.
>> >>> > >
>> >>> > > Mate, I think the new configs should probably default to the new
>> >>> mode and
>> >>> > > they should only be useful for users to fall back to the old
>> >>> behaviour.
>> >>> > > We could by default use the new Snapshot CRD if the CRD is
>> installed,
>> >>> > > otherwise use the old mode by default and log a warning on
>> startup.
>> >>> > >
>> >>> > > So I am suggesting a "dynamic" default behaviour based on whether
>> >>> the new
>> >>> > > CRD was installed or not because we don't want to break operator
>> >>> startup.
>> >>> > >
>> >>> > > Gyula
>> >>> > >
>> >>> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany <czmat...@gmail.com>
>> >>> wrote:
>> >>> > >
>> >>> > > > Hi Ferenc,
>> >>> > > >
>> >>> > > > Thank you for your comments, I have updated the Google docs
>> with a
>> >>> new
>> >>> > > > section for the new configs.
>> >>> > > > All of the newly added config keys will have defaults set, and
>> by
>> >>> > default
>> >>> > > > all the savepoint/checkpoint operations will use the old system:
>> >>> write
>> >>> > > > their results to the FlinkDeployment/FlinkSessionJob status
>> field.
>> >>> > > >
>> >>> > > > I have also added a default for the checkpoint type to be FULL
>> >>> (which
>> >>> > is
>> >>> > > > also the default currently). That was an oversight on my part to
>> >>> miss
>> >>> > > that.
>> >>> > > >
>> >>> > > > Regards,
>> >>> > > > Mate
>> >>> > > >
>> >>> > > > Ferenc Csaky <ferenc.cs...@pm.me.invalid> ezt írta (időpont:
>> 2024.
>> >>> > ápr.
>> >>> > > > 16., K, 16:10):
>> >>> > > >
>> >>> > > > > Thank you Mate for initiating this discussion. +1 for this
>> idea.
>> >>> > > > > Some Qs:
>> >>> > > > >
>> >>> > > > > Can you specify the newly introduced configurations in more
>> >>> > > > > details? Currently, it is not fully clear to me what are the
>> >>> > > > > possible values of
>> `kubernetes.operator.periodic.savepoint.mode`,
>> >>> > > > > is it optional, has a default value?
>> >>> > > > >
>> >>> > > > > I see that in `SavepointSpec.formatType` has a default,
>> although
>> >>> > > > > `CheckppointSpec.checkpointType` not. Are we inferring that
>> from
>> >>> > > > > the config? My point is, in general I think it would be good
>> to
>> >>> > > > > handle the two snapshot types in a similar way when it makes
>> >>> sense
>> >>> > > > > to minimize any kind of confusion.
>> >>> > > > >
>> >>> > > > > Best,
>> >>> > > > > Ferenc
>> >>> > > > >
>> >>> > > > >
>> >>> > > > >
>> >>> > > > > On Tuesday, April 16th, 2024 at 11:34, Mate Czagany <
>> >>> > > czmat...@gmail.com>
>> >>> > > > > wrote:
>> >>> > > > >
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > Hi Everyone,
>> >>> > > > > >
>> >>> > > > > > I would like to start a discussion on FLIP-446: Kubernetes
>> >>> Operator
>> >>> > > > State
>> >>> > > > > > Snapshot CRD.
>> >>> > > > > >
>> >>> > > > > > This FLIP adds a new custom resource for Operator users to
>> >>> create
>> >>> > and
>> >>> > > > > > manage their savepoints and checkpoints. I have also
>> developed
>> >>> an
>> >>> > > > initial
>> >>> > > > > > POC to prove that this approach is feasible, you can find
>> the
>> >>> link
>> >>> > > for
>> >>> > > > > that
>> >>> > > > > > in the FLIP.
>> >>> > > > > >
>> >>> > > > > > There is a Confluence page [1] and a Google Docs page [2]
>> as I
>> >>> do
>> >>> > not
>> >>> > > > > have
>> >>> > > > > > a Confluence account yet.
>> >>> > > > > >
>> >>> > > > > > [1]
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD
>> >>> > > > > > [2]
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> https://docs.google.com/document/d/1VdfLFaE4i6ESbCQ38CH7TKOiPQVvXeOxNV2FeSMnOTg
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > Regards,
>> >>> > > > > > Mate
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>>
>

Reply via email to