Hi David,

Thanks for creating this flip. I think this work it is very useful,
especially in autoscaling scenario.  I would like to share some questions
from my view.

1, Is it possible for this REST API to declare TM resources in the future?
I'm asking because we are building the autoscaling feature for Flink OLAP
Session Cluster in ByteDance. We need to rescale the cluster's resource on
TM level instead of Job level. It would be very helpful if we have a REST
API for out external Autoscaling service to use.

2, And for streaming jobs, I'm wondering if there is any situation we need
to rescale the TM resources of a flink cluster at first and then the
adaptive scheduler will rescale the per-vertex ResourceProfiles
accordingly.

best.
Xiangyu

Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道:

> Hi David
>
> Thanks for your answer.
>
> > Can you elaborate more about how you'd intend to use the endpoint? I
> think we can ultimately introduce a way of re-declaring "per-vertex
> defaults," but I'd like to understand the use case bit more first.
>
> For this issue, I mainly consider the consistency of user configuration and
> job runtime. For sql jobs, users usually set specific parallelism for
> source and sink, and set a global parallelism for other operators. These
> config items are stored in a config file. For some high-priority jobs,
> users may want to manage them manually.
> 1. When users need to scale the parallelism, they should update the config
> file and restart flink job, which may take a long time.
> 2. After providing the REST API, users can just send a request to the job
> via REST API quickly after updating the config file.
> The configuration in the running job and config file should be the same.
> What do you think of this?
>
> best.
> Shammon
>
>
>
> On Tue, Feb 7, 2023 at 4:51 PM David Morávek <david.mora...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > Let's try to answer the questions one by one.
> >
> > *@ConradJam*
> >
> > when the number of "slots" is insufficient, can we can stop users
> rescaling
> > > or throw something to tell user "less avaliable slots to upgrade,
> please
> > > checkout your alivalbe slots" ?
> > >
> >
> > The main property of AdaptiveScheduler is that it can adapt to "available
> > resources," which means you're still able to make progress even though
> you
> > didn't get all the slots you've asked for. Let's break down the pros and
> > cons of this property.
> >
> > - (plus) If you lose a TM for some reason, you can still recover even if
> it
> > doesn't come back. We still need to give it some time to eliminate
> > unnecessary rescaling, which can be controlled by setting
> > "resource-stabilization-timeout."
> > - (plus) The resources can arrive with a significant delay. For example,
> > you're unable to spawn enough TMs on time because you've run out of
> > resources in your k8s cluster, and you need to wait for the cluster auto
> > scaler to kick in and add new nodes to the cluster. In this scenario,
> > you'll be able to start making progress faster, at the cost of multiple
> > rescalings (once the remaining resources arrive).
> > - (plus) This plays well with the declarative manner of today's
> > infrastructure. For example, you tell k8s that you need 10 TMs, and
> you'll
> > eventually get them.
> > - (minus) In the case of large state jobs, the cost of multiple
> rescalings
> > might outweigh the above.
> >
> > We've already touched on the solution to this problem on the FLIP. Please
> > notice the parallelism knob being a range with a lower and upper bound.
> > Setting both the lower and upper bound to the same value could give the
> > behavior you're describing at the cost of giving up some properties that
> AS
> > gives you (you'd be falling back to the DefaultScheduler's behavior).
> >
> > when user upgrade job-vertx-parallelism . I want to have an interface to
> > > query the current update parallel execution status, so that the user or
> > > program can understand the current status
> > >
> >
> > This is a misunderstanding. We're not introducing the RESCALE endpoint.
> > This endpoint allows you to re-declare the resources needed to run the
> job.
> > Once you reach the desired resources (you get more resources than the
> lower
> > bound defines), your job will run.
> >
> > We can expose a similar endpoint to "resource requirements" to give you
> an
> > overview of the resources the vertices already have. You can already get
> > this from the REST API, so exposing this in yet another way should be
> > considered carefully.
> >
> > *@Matthias*
> >
> > I'm wondering whether it makes sense to add some kind of resource ID to
> the
> > > REST API.
> >
> >
> > That's a good question. I want to think about that and get back to the
> > question later. My main struggle when thinking about this is, "if this
> > would be an idempotent POST endpoint," would it be any different?
> >
> > How often do we allow resource requirements to be changed?
> >
> >
> > There shall be no rate limiting on the FLINK side. If this is something
> > your environment needs, you can achieve it on a different layer ("we
> can't
> > have FLINK to do everything").
> >
> > Versioning the JobGraph in the JobGraphStore rather than overwriting it
> > > might be an idea.
> > >
> >
> > This sounds interesting since it would be closer to the JobGraph being
> > immutable. The main problem I see here is that this would introduce a
> > BW-incompatible change so it might be a topic for follow-up FLIP.
> >
> > I'm just wondering whether we bundle two things together that are
> actually
> > > separate
> > >
> >
> > Yup, this is how we think about it as well. The main question is, "who
> > should be responsible for bookkeeping 1) the JobGraph and 2) the
> > JobResourceRequirements". The JobMaster would be the right place for
> both,
> > but it's currently not the case, and we're tightly coupling the
> dispatcher
> > with the JobMaster.
> >
> > Initially, we tried to introduce a separate HA component in JobMaster for
> > bookkeeping the JobResourceRequirements, but that proved to be a more
> > significant effort adding additional mess to the already messy HA
> > ecosystem. Another approach we've discussed was mutating the JobGraph and
> > setting JRR into the JobGraph structure itself.
> >
> > The middle ground for keeping this effort reasonably sized and not
> > violating "we want to keep JG immutable" too much is keeping the
> > JobResourceRequirements separate as an internal config option in
> JobGraph's
> > configuration.
> >
> > We ultimately need to rethink the tight coupling of Dispatcher and
> > JobMaster, but it needs to be a separate effort.
> >
> > ...also considering the amount of data that can be stored in a
> > > ConfigMap/ZooKeeper node if versioning the resource requirement change
> as
> > > proposed in my previous item is an option for us.
> > >
> >
> > AFAIK we're only storing pointers to the S3 objects in HA metadata, so we
> > should be okay with having larger structures for now.
> >
> > Updating the JobGraphStore means adding more requests to the HA backend
> > API.
> > >
> >
> > It's fine unless you intend to override the resource requirements a few
> > times per second.
> >
> > *@Shammon*
> >
> > How about adding some more information such as vertex type
> > >
> >
> > Since it was intended as a "debug" endpoint, it makes complete sense!
> >
> >  For sql jobs, we always use a unified parallelism for most vertices. Can
> > > we provide them with a more convenient setting method instead of each
> > one?
> >
> >
> > I completely feel with this. The main thoughts when designing the API
> were:
> > - We want to keep it clean and easy to understand.
> > - Global parallelism can be modeled using per-vertex parallelism but not
> > the other way around.
> > - The API will be used by external tooling (operator, auto scaler).
> >
> > Can you elaborate more about how you'd intend to use the endpoint? I
> think
> > we can ultimately introduce a way of re-declaring "per-vertex defaults,"
> > but I'd like to understand the use case bit more first.
> >
> > *@Weijie*
> >
> > What is the default value here (based on what configuration), or just
> > > infinite?
> > >
> >
> > Currently, for the lower bound, it's always one, and for the upper bound,
> > it's either parallelism (if defined) or the maxParallelism of the vertex
> in
> > JobGraph. This question might be another signal for making the defaults
> > explicit (see the answer to Shammon's question above).
> >
> >
> > Thanks, everyone, for your initial thoughts!
> >
> > Best,
> > D.
> >
> > On Tue, Feb 7, 2023 at 4:39 AM weijie guo <guoweijieres...@gmail.com>
> > wrote:
> >
> > > Thanks David for driving this. This is a very valuable work, especially
> > for
> > > cloud native environment.
> > >
> > > >> How about adding some more information such as vertex type
> > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> > > resource-requirements`? For users, only vertex-id may be difficult to
> > > understand.
> > >
> > > +1 for this suggestion, including jobvertex's name in the response body
> > is
> > > more
> > > user-friendly.
> > >
> > >
> > > I saw this sentence in FLIP: "Setting the upper bound to -1 will reset
> > the
> > > value to the default setting."  What is the default value here (based
> on
> > > what configuration), or just infinite?
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > >
> > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 18:06写道:
> > >
> > > > Hi David
> > > >
> > > > Thanks for initiating this discussion. I think declaring job resource
> > > > requirements by REST API is very valuable. I just left some comments
> as
> > > > followed
> > > >
> > > > 1) How about adding some more information such as vertex type
> > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> > > > resource-requirements`? For users, only vertex-id may be difficult to
> > > > understand.
> > > >
> > > > 2) For sql jobs, we always use a unified parallelism for most
> vertices.
> > > Can
> > > > we provide them with a more convenient setting method instead of each
> > > one?
> > > >
> > > >
> > > > Best,
> > > > Shammon
> > > >
> > > >
> > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl <matthias.p...@aiven.io
> > > > .invalid>
> > > > wrote:
> > > >
> > > > > Thanks David for creating this FLIP. It sounds promising and useful
> > to
> > > > > have. Here are some thoughts from my side (some of them might be
> > > rather a
> > > > > follow-up and not necessarily part of this FLIP):
> > > > > - I'm wondering whether it makes sense to add some kind of resource
> > ID
> > > to
> > > > > the REST API. This would give Flink a tool to verify the PATCH
> > request
> > > of
> > > > > the external system in a compare-and-set kind of manner. AFAIU, the
> > > > process
> > > > > requires the external system to retrieve the resource requirements
> > > first
> > > > > (to retrieve the vertex IDs). A resource ID <ABC> would be sent
> along
> > > as
> > > > a
> > > > > unique identifier for the provided setup. It's essentially the
> > version
> > > ID
> > > > > of the currently deployed resource requirement configuration. Flink
> > > > doesn't
> > > > > know whether the external system would use the provided information
> > in
> > > > some
> > > > > way to derive a new set of resource requirements for this job. The
> > > > > subsequent PATCH request with updated resource requirements would
> > > include
> > > > > the previously retrieved resource ID <ABC>. The PATCH call would
> fail
> > > if
> > > > > there was a concurrent PATCH call in between indicating to the
> > external
> > > > > system that the resource requirements were concurrently updated.
> > > > > - How often do we allow resource requirements to be changed? That
> > > > question
> > > > > might make my previous comment on the resource ID obsolete because
> we
> > > > could
> > > > > just make any PATCH call fail if there was a resource requirement
> > > update
> > > > > within a certain time frame before the request. But such a time
> > period
> > > is
> > > > > something we might want to make configurable then, I guess.
> > > > > - Versioning the JobGraph in the JobGraphStore rather than
> > overwriting
> > > it
> > > > > might be an idea. This would enable us to provide resource
> > requirement
> > > > > changes in the UI or through the REST API. It is related to a
> problem
> > > > > around keeping track of the exception history within the
> > > > AdaptiveScheduler
> > > > > and also having to consider multiple versions of a JobGraph. But
> for
> > > that
> > > > > one, we use the ExecutionGraphInfoStore right now.
> > > > > - Updating the JobGraph in the JobGraphStore makes sense. I'm just
> > > > > wondering whether we bundle two things together that are actually
> > > > separate:
> > > > > The business logic and the execution configuration (the resource
> > > > > requirements). I'm aware that this is not a flaw of the current
> FLIP
> > > but
> > > > > rather something that was not necessary to address in the past
> > because
> > > > the
> > > > > JobGraph was kind of static. I don't remember whether that was
> > already
> > > > > discussed while working on the AdaptiveScheduler for FLIP-160 [1].
> > > Maybe,
> > > > > I'm missing some functionality here that requires us to have
> > everything
> > > > in
> > > > > one place. But it feels like updating the entire JobGraph which
> could
> > > be
> > > > > actually a "config change" is not reasonable. ...also considering
> the
> > > > > amount of data that can be stored in a ConfigMap/ZooKeeper node if
> > > > > versioning the resource requirement change as proposed in my
> previous
> > > > item
> > > > > is an option for us.
> > > > > - Updating the JobGraphStore means adding more requests to the HA
> > > backend
> > > > > API. There were some concerns shared in the discussion thread [2]
> for
> > > > > FLIP-270 [3] on pressuring the k8s API server in the past with too
> > many
> > > > > calls. Eventhough, it's more likely to be caused by checkpointing,
> I
> > > > still
> > > > > wanted to bring it up. We're working on a standardized performance
> > test
> > > > to
> > > > > prepare going forward with FLIP-270 [3] right now.
> > > > >
> > > > > Best,
> > > > > Matthias
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> > > > > [2]
> https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
> > > > > [3]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> > > > >
> > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam <jam.gz...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi David:
> > > > > >
> > > > > > Thank you for drive this flip, which helps less flink shutdown
> time
> > > > > >
> > > > > > for this flip, I would like to make a few idea on share
> > > > > >
> > > > > >
> > > > > >    - when the number of "slots" is insufficient, can we can stop
> > > users
> > > > > >    rescaling or throw something to tell user "less avaliable
> slots
> > to
> > > > > > upgrade,
> > > > > >    please checkout your alivalbe slots" ? Or we could have a
> > request
> > > > > >    switch(true/false) to allow this behavior
> > > > > >
> > > > > >
> > > > > >    - when user upgrade job-vertx-parallelism . I want to have an
> > > > > interface
> > > > > >    to query the current update parallel execution status, so that
> > the
> > > > > user
> > > > > > or
> > > > > >    program can understand the current status
> > > > > >    - I want to have an interface to query the current update
> > > > parallelism
> > > > > >    execution status. This also helps similar to *[1] Flink K8S
> > > > Operator*
> > > > > >    management
> > > > > >
> > > > > >
> > > > > > {
> > > > > >   status: Failed
> > > > > >   reason: "less avaliable slots to upgrade, please checkout your
> > > > alivalbe
> > > > > > slots"
> > > > > > }
> > > > > >
> > > > > >
> > > > > >
> > > > > >    - *Pending*: this job now is join the upgrade queue,it will be
> > > > update
> > > > > >    later
> > > > > >    - *Rescaling*: job now is rescaling,wait it finish
> > > > > >    - *Finished*: finish do it
> > > > > >    - *Failed* : something have wrong,so this job is not alivable
> > > > upgrade
> > > > > >
> > > > > > I want to supplement my above content in flip, what do you think
> ?
> > > > > >
> > > > > >
> > > > > >    1.
> > > > > >
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> > > > > >
> > > > > >
> > > > > > David Morávek <d...@apache.org> 于2023年2月3日周五 16:42写道:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > This FLIP [1] introduces a new REST API for declaring resource
> > > > > > requirements
> > > > > > > for the Adaptive Scheduler. There seems to be a clear need for
> > this
> > > > API
> > > > > > > based on the discussion on the "Reworking the Rescale API" [2]
> > > > thread.
> > > > > > >
> > > > > > > Before we get started, this work is heavily based on the
> > prototype
> > > > [3]
> > > > > > > created by Till Rohrmann, and the FLIP is being published with
> > his
> > > > > > consent.
> > > > > > > Big shoutout to him!
> > > > > > >
> > > > > > > Last and not least, thanks to Chesnay and Roman for the initial
> > > > reviews
> > > > > > and
> > > > > > > discussions.
> > > > > > >
> > > > > > > The best start would be watching a short demo [4] that I've
> > > recorded,
> > > > > > which
> > > > > > > illustrates newly added capabilities (rescaling the running
> job,
> > > > > handing
> > > > > > > back resources to the RM, and session cluster support).
> > > > > > >
> > > > > > > The intuition behind the FLIP is being able to define resource
> > > > > > requirements
> > > > > > > ("resource boundaries") externally that the AdaptiveScheduler
> can
> > > > > > navigate
> > > > > > > within. This is a building block for higher-level efforts such
> as
> > > an
> > > > > > > external Autoscaler. The natural extension of this work would
> be
> > to
> > > > > allow
> > > > > > > to specify per-vertex ResourceProfiles.
> > > > > > >
> > > > > > > Looking forward to your thoughts; any feedback is appreciated!
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> > > > > > > [2]
> > > https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
> > > > > > > [3] https://github.com/tillrohrmann/flink/tree/autoscaling
> > > > > > > [4]
> > > > > >
> > > https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view
> > > > > > >
> > > > > > > Best,
> > > > > > > D.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best
> > > > > >
> > > > > > ConradJam
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to