I agree that it is useful to have a configurable lower bound. Thanks for looking into it as part of a follow up!
No objections from my side to move forward with the vote. -Max On Tue, Feb 28, 2023 at 1:36 PM David Morávek <d...@apache.org> wrote: > > > I suppose we could further remove the min because it would always be > safer to scale down if resources are not available than not to run at > all [1]. > > Apart from what @Roman has already mentioned, there are still cases where > we're certain that there is no point in running the jobs with resources > lower than X; e.g., because the state is too large to be processed with > parallelism of 1; this allows you not to waste resources if you're certain > that the job would go into the restart loop / won't be able to checkpoint > > I believe that for most use cases, simply keeping the lower bound at 1 will > be sufficient. > > > I saw that the minimum bound is currently not used in the code you posted > above [2]. Is that still planned? > > Yes. We already allow setting the lower bound via API, but it's not > considered by the scheduler. I'll address this limitation in a separate > issue. > > > Note that originally we had assumed min == max but I think that would be > a less safe scaling approach because we would get stuck waiting for > resources when they are not available, e.g. k8s resource limits reached. > > 100% agreed; The above-mentioned knobs should allow you to balance the > trade-off. > > > Does that make sense? > > Best, > D. > > > > On Tue, Feb 28, 2023 at 1:14 PM Roman Khachatryan <ro...@apache.org> wrote: > > > Hi, > > > > Thanks for the update, I think distinguishing the rescaling behaviour and > > the desired parallelism declaration is important. > > > > Having the ability to specify min parallelism might be useful in > > environments with multiple jobs: Scheduler will then have an option to stop > > the less suitable job. > > In other setups, where the job should not be stopped at all, the user can > > always set it to 0. > > > > Regards, > > Roman > > > > > > On Tue, Feb 28, 2023 at 12:58 PM Maximilian Michels <m...@apache.org> > > wrote: > > > >> Hi David, > >> > >> Thanks for the update! We consider using the new declarative resource > >> API for autoscaling. Currently, we treat a scaling decision as a new > >> deployment which means surrendering all resources to Kubernetes and > >> subsequently reallocating them for the rescaled deployment. The > >> declarative resource management API is a great step forward because it > >> allows us to do faster and safer rescaling. Faster, because we can > >> continue to run while resources are pre-allocated which minimizes > >> downtime. Safer, because we can't get stuck when the desired resources > >> are not available. > >> > >> An example with two vertices and their respective parallelisms: > >> v1: 50 > >> v2: 10 > >> Let's assume slot sharing is disabled, so we need 60 task slots to run > >> the vertices. > >> > >> If the autoscaler was to decide to scale up v1 and v2, it could do so > >> in a safe way by using min/max configuration: > >> v1: [min: 50, max: 70] > >> v2: [min: 10, max: 20] > >> This would then need 90 task slots to run at max capacity. > >> > >> I suppose we could further remove the min because it would always be > >> safer to scale down if resources are not available than to not run at > >> all [1]. In fact, I saw that the minimum bound is currently not used > >> in the code you posted above [2]. Is that still planned? > >> > >> -Max > >> > >> PS: Note that originally we had assumed min == max but I think that > >> would be a less safe scaling approach because we would get stuck > >> waiting for resources when they are not available, e.g. k8s resource > >> limits reached. > >> > >> [1] However, there might be costs involved with executing the > >> rescaling, e.g. for using external storage like s3, especially without > >> local recovery. > >> [2] > >> https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9 > >> > >> On Tue, Feb 28, 2023 at 9:33 AM David Morávek <d...@apache.org> wrote: > >> > > >> > Hi Everyone, > >> > > >> > We had some more talks about the pre-allocation of resources with @Max, > >> and > >> > here is the final state that we've converged to for now: > >> > > >> > The vital thing to note about the new API is that it's declarative, > >> meaning > >> > we're declaring the desired state to which we want our job to converge; > >> If, > >> > after the requirements update job no longer holds the desired resources > >> > (fewer resources than the lower bound), it will be canceled and > >> transition > >> > back into the waiting for resources state. > >> > > >> > In some use cases, you might always want to rescale to the upper bound > >> > (this goes along the lines of "preallocating resources" and minimizing > >> the > >> > number of rescales, which is especially useful with the large state). > >> This > >> > can be controlled by two knobs that already exist: > >> > > >> > 1) "jobmanager.adaptive-scheduler.min-parallelism-increase" - this > >> affects > >> > a minimal parallelism increase step of a running job; we'll slightly > >> change > >> > the semantics, and we'll trigger rescaling either once this condition is > >> > met or when you hit the ceiling; setting this to the high number will > >> > ensure that you always rescale to the upper bound > >> > > >> > 2) "jobmanager.adaptive-scheduler.resource-stabilization-timeout" - for > >> new > >> > and already restarting jobs, we'll always respect this timeout, which > >> > allows you to wait for more resources even though you already have more > >> > resources than defined in the lower bound; again, in the case we reach > >> the > >> > ceiling (the upper bound), we'll transition into the executing state. > >> > > >> > > >> > We're still planning to dig deeper in this direction with other efforts, > >> > but this is already good enough and should allow us to move the FLIP > >> > forward. > >> > > >> > WDYT? Unless there are any objectives against the above, I'd like to > >> > proceed to a vote. > >> > > >> > Best, > >> > D. > >> > > >> > On Thu, Feb 23, 2023 at 5:39 PM David Morávek <d...@apache.org> wrote: > >> > > >> > > Hi Everyone, > >> > > > >> > > @John > >> > > > >> > > This is a problem that we've spent some time trying to crack; in the > >> end, > >> > > we've decided to go against doing any upgrades to JobGraphStore from > >> > > JobMaster to avoid having multiple writers that are guarded by > >> different > >> > > leader election lock (Dispatcher and JobMaster might live in a > >> different > >> > > process). The contract we've decided to choose instead is leveraging > >> the > >> > > idempotency of the endpoint and having the user of the API retry in > >> case > >> > > we're unable to persist new requirements in the JobGraphStore [1]. We > >> > > eventually need to move JobGraphStore out of the dispatcher, but > >> that's way > >> > > out of the scope of this FLIP. The solution is a deliberate > >> trade-off. The > >> > > worst scenario is that the Dispatcher fails over in between retries, > >> which > >> > > would simply rescale the job to meet the previous resource > >> requirements > >> > > (more extended unavailability of underlying HA storage would have > >> worse > >> > > consequences than this). Does that answer your question? > >> > > > >> > > @Matthias > >> > > > >> > > Good catch! I'm fixing it now, thanks! > >> > > > >> > > [1] > >> > > > >> https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cR1151 > >> > > > >> > > Best, > >> > > D. > >> > > > >> > > On Tue, Feb 21, 2023 at 12:24 AM John Roesler <vvcep...@apache.org> > >> wrote: > >> > > > >> > >> Thanks for the FLIP, David! > >> > >> > >> > >> I just had one small question. IIUC, the REST API PUT request will go > >> > >> through the new DispatcherGateway method to be handled. Then, after > >> > >> validation, the dispatcher would call the new JobMasterGateway > >> method to > >> > >> actually update the job. > >> > >> > >> > >> Which component will write the updated JobGraph? I just wanted to > >> make > >> > >> sure it’s the JobMaster because it it were the dispatcher, there > >> could be a > >> > >> race condition with the async JobMaster method. > >> > >> > >> > >> Thanks! > >> > >> -John > >> > >> > >> > >> On Mon, Feb 20, 2023, at 07:34, Matthias Pohl wrote: > >> > >> > Thanks for your clarifications, David. I don't have any additional > >> major > >> > >> > points to add. One thing about the FLIP: The RPC layer API for > >> updating > >> > >> the > >> > >> > JRR returns a future with a JRR? I don't see value in returning a > >> JRR > >> > >> here > >> > >> > since it's an idempotent operation? Wouldn't it be enough to return > >> > >> > CompletableFuture<Void> here? Or am I missing something? > >> > >> > > >> > >> > Matthias > >> > >> > > >> > >> > On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels <m...@apache.org > >> > > >> > >> wrote: > >> > >> > > >> > >> >> Thanks David! If we could get the pre-allocation working as part > >> of > >> > >> >> the FLIP, that would be great. > >> > >> >> > >> > >> >> Concerning the downscale case, I agree this is a special case for > >> the > >> > >> >> (single-job) application mode where we could re-allocate slots in > >> a > >> > >> >> way that could leave entire task managers unoccupied which we > >> would > >> > >> >> then be able to release. The goal essentially is to reduce slot > >> > >> >> fragmentation on scale down by packing the slots efficiently. The > >> > >> >> easiest way to add this optimization when running in application > >> mode > >> > >> >> would be to drop as many task managers during the restart such > >> that > >> > >> >> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look > >> into > >> > >> >> this independently of the FLIP. > >> > >> >> > >> > >> >> Feel free to start the vote. > >> > >> >> > >> > >> >> -Max > >> > >> >> > >> > >> >> On Mon, Feb 20, 2023 at 9:10 AM David Morávek <d...@apache.org> > >> wrote: > >> > >> >> > > >> > >> >> > Hi everyone, > >> > >> >> > > >> > >> >> > Thanks for the feedback! I've updated the FLIP to use > >> idempotent PUT > >> > >> API > >> > >> >> instead of PATCH and to properly handle lower bound settings, to > >> > >> support > >> > >> >> the "pre-allocation" of the resources. > >> > >> >> > > >> > >> >> > @Max > >> > >> >> > > >> > >> >> > > How hard would it be to address this issue in the FLIP? > >> > >> >> > > >> > >> >> > I've included this in the FLIP. It might not be too hard to > >> implement > >> > >> >> this in the end. > >> > >> >> > > >> > >> >> > > B) drop as many superfluous task managers as needed > >> > >> >> > > >> > >> >> > I've intentionally left this part out for now because this > >> ultimately > >> > >> >> needs to be the responsibility of the Resource Manager. After > >> all, in > >> > >> the > >> > >> >> Session Cluster scenario, the Scheduler doesn't have the bigger > >> > >> picture of > >> > >> >> other tasks of other jobs running on those TMs. This will most > >> likely > >> > >> be a > >> > >> >> topic for another FLIP. > >> > >> >> > > >> > >> >> > WDYT? If there are no other questions or concerns, I'd like to > >> start > >> > >> the > >> > >> >> vote on Wednesday. > >> > >> >> > > >> > >> >> > Best, > >> > >> >> > D. > >> > >> >> > > >> > >> >> > On Wed, Feb 15, 2023 at 3:34 PM Maximilian Michels < > >> m...@apache.org> > >> > >> >> wrote: > >> > >> >> >> > >> > >> >> >> I missed that the FLIP states: > >> > >> >> >> > >> > >> >> >> > Currently, even though we’d expose the lower bound for > >> clarity and > >> > >> >> API completeness, we won’t allow setting it to any other value > >> than one > >> > >> >> until we have full support throughout the stack. > >> > >> >> >> > >> > >> >> >> How hard would it be to address this issue in the FLIP? > >> > >> >> >> > >> > >> >> >> There is not much value to offer setting a lower bound which > >> won't > >> > >> be > >> > >> >> >> respected / throw an error when it is set. If we had support > >> for a > >> > >> >> >> lower bound, we could enforce a resource contract externally > >> via > >> > >> >> >> setting lowerBound == upperBound. That ties back to the > >> Rescale API > >> > >> >> >> discussion we had. I want to better understand what the major > >> > >> concerns > >> > >> >> >> would be around allowing this. > >> > >> >> >> > >> > >> >> >> Just to outline how I imagine the logic to work: > >> > >> >> >> > >> > >> >> >> A) The resource constraints are already met => Nothing changes > >> > >> >> >> B) More resources available than required => Cancel the job, > >> drop as > >> > >> >> >> many superfluous task managers as needed, restart the job > >> > >> >> >> C) Less resources available than required => Acquire new task > >> > >> >> >> managers, wait for them to register, cancel and restart the job > >> > >> >> >> > >> > >> >> >> I'm open to helping out with the implementation. > >> > >> >> >> > >> > >> >> >> -Max > >> > >> >> >> > >> > >> >> >> On Mon, Feb 13, 2023 at 7:45 PM Maximilian Michels < > >> m...@apache.org> > >> > >> >> wrote: > >> > >> >> >> > > >> > >> >> >> > Based on further discussion I had with Chesnay on this PR > >> [1], I > >> > >> think > >> > >> >> >> > jobs would currently go into a restarting state after the > >> resource > >> > >> >> >> > requirements have changed. This wouldn't achieve what we had > >> in > >> > >> mind, > >> > >> >> >> > i.e. sticking to the old resource requirements until enough > >> slots > >> > >> are > >> > >> >> >> > available to fulfil the new resource requirements. So this > >> may > >> > >> not be > >> > >> >> >> > 100% what we need but it could be extended to do what we > >> want. > >> > >> >> >> > > >> > >> >> >> > -Max > >> > >> >> >> > > >> > >> >> >> > [1] > >> > >> https://github.com/apache/flink/pull/21908#discussion_r1104792362 > >> > >> >> >> > > >> > >> >> >> > On Mon, Feb 13, 2023 at 7:16 PM Maximilian Michels < > >> > >> m...@apache.org> > >> > >> >> wrote: > >> > >> >> >> > > > >> > >> >> >> > > Hi David, > >> > >> >> >> > > > >> > >> >> >> > > This is awesome! Great writeup and demo. This is pretty > >> much > >> > >> what we > >> > >> >> >> > > need for the autoscaler as part of the Flink Kubernetes > >> operator > >> > >> >> [1]. > >> > >> >> >> > > Scaling Flink jobs effectively is hard but fortunately we > >> have > >> > >> >> solved > >> > >> >> >> > > the issue as part of the Flink Kubernetes operator. The > >> only > >> > >> >> critical > >> > >> >> >> > > piece we are missing is a better way to execute scaling > >> > >> decisions, > >> > >> >> as > >> > >> >> >> > > discussed in [2]. > >> > >> >> >> > > > >> > >> >> >> > > Looking at your proposal, we would set lowerBound == > >> upperBound > >> > >> for > >> > >> >> >> > > the parallelism because we want to fully determine the > >> > >> parallelism > >> > >> >> >> > > externally based on the scaling metrics. Does that sound > >> right? > >> > >> >> >> > > > >> > >> >> >> > > What is the timeline for these changes? Is there a JIRA? > >> > >> >> >> > > > >> > >> >> >> > > Cheers, > >> > >> >> >> > > Max > >> > >> >> >> > > > >> > >> >> >> > > [1] > >> > >> >> > >> > >> > >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/ > >> > >> >> >> > > [2] > >> > >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 > >> > >> >> >> > > > >> > >> >> >> > > On Mon, Feb 13, 2023 at 1:16 PM feng xiangyu < > >> > >> xiangyu...@gmail.com> > >> > >> >> wrote: > >> > >> >> >> > > > > >> > >> >> >> > > > Hi David, > >> > >> >> >> > > > > >> > >> >> >> > > > Thanks for your reply. I think your response totally > >> make > >> > >> >> sense. This > >> > >> >> >> > > > flip targets on declaring required resource to > >> ResourceManager > >> > >> >> instead of > >> > >> >> >> > > > using ResourceManager to add/remove TMs directly. > >> > >> >> >> > > > > >> > >> >> >> > > > Best, > >> > >> >> >> > > > Xiangyu > >> > >> >> >> > > > > >> > >> >> >> > > > > >> > >> >> >> > > > > >> > >> >> >> > > > David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 > >> > >> 15:46写道: > >> > >> >> >> > > > > >> > >> >> >> > > > > Hi everyone, > >> > >> >> >> > > > > > >> > >> >> >> > > > > @Shammon > >> > >> >> >> > > > > > >> > >> >> >> > > > > I'm not entirely sure what "config file" you're > >> referring > >> > >> to. > >> > >> >> You can, of > >> > >> >> >> > > > > course, override the default parallelism in > >> > >> "flink-conf.yaml", > >> > >> >> but for > >> > >> >> >> > > > > sinks and sources, the parallelism needs to be tweaked > >> on > >> > >> the > >> > >> >> connector > >> > >> >> >> > > > > level ("WITH" statement). > >> > >> >> >> > > > > > >> > >> >> >> > > > > This is something that should be achieved with tooling > >> > >> around > >> > >> >> Flink. We > >> > >> >> >> > > > > want to provide an API on the lowest level that > >> generalizes > >> > >> >> well. Achieving > >> > >> >> >> > > > > what you're describing should be straightforward with > >> this > >> > >> API. > >> > >> >> >> > > > > > >> > >> >> >> > > > > @Xiangyu > >> > >> >> >> > > > > > >> > >> >> >> > > > > Is it possible for this REST API to declare TM > >> resources in > >> > >> the > >> > >> >> future? > >> > >> >> >> > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > Would you like to add/remove TMs if you use an active > >> > >> Resource > >> > >> >> Manager? > >> > >> >> >> > > > > This would be out of the scope of this effort since it > >> > >> targets > >> > >> >> the > >> > >> >> >> > > > > scheduler component only (we make no assumptions about > >> the > >> > >> used > >> > >> >> Resource > >> > >> >> >> > > > > Manager). Also, the AdaptiveScheduler is only intended > >> to be > >> > >> >> used for > >> > >> >> >> > > > > Streaming. > >> > >> >> >> > > > > > >> > >> >> >> > > > > And for streaming jobs, I'm wondering if there is any > >> > >> >> situation we need to > >> > >> >> >> > > > > > rescale the TM resources of a flink cluster at first > >> and > >> > >> then > >> > >> >> the > >> > >> >> >> > > > > adaptive > >> > >> >> >> > > > > > scheduler will rescale the per-vertex > >> ResourceProfiles > >> > >> >> accordingly. > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > We plan on adding support for the ResourceProfiles > >> (dynamic > >> > >> slot > >> > >> >> >> > > > > allocation) as the next step. Again we won't make any > >> > >> >> assumptions about the > >> > >> >> >> > > > > used Resource Manager. In other words, this effort > >> ends by > >> > >> >> declaring > >> > >> >> >> > > > > desired resources to the Resource Manager. > >> > >> >> >> > > > > > >> > >> >> >> > > > > Does that make sense? > >> > >> >> >> > > > > > >> > >> >> >> > > > > @Matthias > >> > >> >> >> > > > > > >> > >> >> >> > > > > We've done another pass on the proposed API and > >> currently > >> > >> lean > >> > >> >> towards > >> > >> >> >> > > > > having an idempotent PUT API. > >> > >> >> >> > > > > - We don't care too much about multiple writers' > >> scenarios > >> > >> in > >> > >> >> terms of who > >> > >> >> >> > > > > can write an authoritative payload; this is up to the > >> user > >> > >> of > >> > >> >> the API to > >> > >> >> >> > > > > figure out > >> > >> >> >> > > > > - It's indeed tricky to achieve atomicity with PATCH > >> API; > >> > >> >> switching to PUT > >> > >> >> >> > > > > API seems to do the trick > >> > >> >> >> > > > > - We won't allow partial "payloads" anymore, meaning > >> you > >> > >> need > >> > >> >> to define > >> > >> >> >> > > > > requirements for all vertices in the JobGraph; This is > >> > >> >> completely fine for > >> > >> >> >> > > > > the programmatic workflows. For DEBUG / DEMO purposes, > >> you > >> > >> can > >> > >> >> use the GET > >> > >> >> >> > > > > endpoint and tweak the response to avoid writing the > >> whole > >> > >> >> payload by hand. > >> > >> >> >> > > > > > >> > >> >> >> > > > > WDYT? > >> > >> >> >> > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > Best, > >> > >> >> >> > > > > D. > >> > >> >> >> > > > > > >> > >> >> >> > > > > On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu < > >> > >> >> xiangyu...@gmail.com> > >> > >> >> >> > > > > wrote: > >> > >> >> >> > > > > > >> > >> >> >> > > > > > Hi David, > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > Thanks for creating this flip. I think this work it > >> is > >> > >> very > >> > >> >> useful, > >> > >> >> >> > > > > > especially in autoscaling scenario. I would like to > >> share > >> > >> >> some questions > >> > >> >> >> > > > > > from my view. > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > 1, Is it possible for this REST API to declare TM > >> > >> resources > >> > >> >> in the > >> > >> >> >> > > > > future? > >> > >> >> >> > > > > > I'm asking because we are building the autoscaling > >> feature > >> > >> >> for Flink OLAP > >> > >> >> >> > > > > > Session Cluster in ByteDance. We need to rescale the > >> > >> >> cluster's resource > >> > >> >> >> > > > > on > >> > >> >> >> > > > > > TM level instead of Job level. It would be very > >> helpful > >> > >> if we > >> > >> >> have a REST > >> > >> >> >> > > > > > API for out external Autoscaling service to use. > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > 2, And for streaming jobs, I'm wondering if there is > >> any > >> > >> >> situation we > >> > >> >> >> > > > > need > >> > >> >> >> > > > > > to rescale the TM resources of a flink cluster at > >> first > >> > >> and > >> > >> >> then the > >> > >> >> >> > > > > > adaptive scheduler will rescale the per-vertex > >> > >> >> ResourceProfiles > >> > >> >> >> > > > > > accordingly. > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > best. > >> > >> >> >> > > > > > Xiangyu > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道: > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > Hi David > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > Thanks for your answer. > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to > >> use > >> > >> the > >> > >> >> endpoint? I > >> > >> >> >> > > > > > > think we can ultimately introduce a way of > >> re-declaring > >> > >> >> "per-vertex > >> > >> >> >> > > > > > > defaults," but I'd like to understand the use case > >> bit > >> > >> more > >> > >> >> first. > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > For this issue, I mainly consider the consistency > >> of > >> > >> user > >> > >> >> configuration > >> > >> >> >> > > > > > and > >> > >> >> >> > > > > > > job runtime. For sql jobs, users usually set > >> specific > >> > >> >> parallelism for > >> > >> >> >> > > > > > > source and sink, and set a global parallelism for > >> other > >> > >> >> operators. > >> > >> >> >> > > > > These > >> > >> >> >> > > > > > > config items are stored in a config file. For some > >> > >> >> high-priority jobs, > >> > >> >> >> > > > > > > users may want to manage them manually. > >> > >> >> >> > > > > > > 1. When users need to scale the parallelism, they > >> should > >> > >> >> update the > >> > >> >> >> > > > > > config > >> > >> >> >> > > > > > > file and restart flink job, which may take a long > >> time. > >> > >> >> >> > > > > > > 2. After providing the REST API, users can just > >> send a > >> > >> >> request to the > >> > >> >> >> > > > > job > >> > >> >> >> > > > > > > via REST API quickly after updating the config > >> file. > >> > >> >> >> > > > > > > The configuration in the running job and config > >> file > >> > >> should > >> > >> >> be the > >> > >> >> >> > > > > same. > >> > >> >> >> > > > > > > What do you think of this? > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > best. > >> > >> >> >> > > > > > > Shammon > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek < > >> > >> >> david.mora...@gmail.com> > >> > >> >> >> > > > > > > wrote: > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Hi everyone, > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Let's try to answer the questions one by one. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > *@ConradJam* > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > when the number of "slots" is insufficient, can > >> we can > >> > >> >> stop users > >> > >> >> >> > > > > > > rescaling > >> > >> >> >> > > > > > > > > or throw something to tell user "less avaliable > >> > >> slots > >> > >> >> to upgrade, > >> > >> >> >> > > > > > > please > >> > >> >> >> > > > > > > > > checkout your alivalbe slots" ? > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > The main property of AdaptiveScheduler is that > >> it can > >> > >> >> adapt to > >> > >> >> >> > > > > > "available > >> > >> >> >> > > > > > > > resources," which means you're still able to make > >> > >> >> progress even > >> > >> >> >> > > > > though > >> > >> >> >> > > > > > > you > >> > >> >> >> > > > > > > > didn't get all the slots you've asked for. Let's > >> break > >> > >> >> down the pros > >> > >> >> >> > > > > > and > >> > >> >> >> > > > > > > > cons of this property. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > - (plus) If you lose a TM for some reason, you > >> can > >> > >> still > >> > >> >> recover even > >> > >> >> >> > > > > > if > >> > >> >> >> > > > > > > it > >> > >> >> >> > > > > > > > doesn't come back. We still need to give it some > >> time > >> > >> to > >> > >> >> eliminate > >> > >> >> >> > > > > > > > unnecessary rescaling, which can be controlled by > >> > >> setting > >> > >> >> >> > > > > > > > "resource-stabilization-timeout." > >> > >> >> >> > > > > > > > - (plus) The resources can arrive with a > >> significant > >> > >> >> delay. For > >> > >> >> >> > > > > > example, > >> > >> >> >> > > > > > > > you're unable to spawn enough TMs on time because > >> > >> you've > >> > >> >> run out of > >> > >> >> >> > > > > > > > resources in your k8s cluster, and you need to > >> wait > >> > >> for > >> > >> >> the cluster > >> > >> >> >> > > > > > auto > >> > >> >> >> > > > > > > > scaler to kick in and add new nodes to the > >> cluster. In > >> > >> >> this scenario, > >> > >> >> >> > > > > > > > you'll be able to start making progress faster, > >> at the > >> > >> >> cost of > >> > >> >> >> > > > > multiple > >> > >> >> >> > > > > > > > rescalings (once the remaining resources arrive). > >> > >> >> >> > > > > > > > - (plus) This plays well with the declarative > >> manner > >> > >> of > >> > >> >> today's > >> > >> >> >> > > > > > > > infrastructure. For example, you tell k8s that > >> you > >> > >> need > >> > >> >> 10 TMs, and > >> > >> >> >> > > > > > > you'll > >> > >> >> >> > > > > > > > eventually get them. > >> > >> >> >> > > > > > > > - (minus) In the case of large state jobs, the > >> cost of > >> > >> >> multiple > >> > >> >> >> > > > > > > rescalings > >> > >> >> >> > > > > > > > might outweigh the above. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > We've already touched on the solution to this > >> problem > >> > >> on > >> > >> >> the FLIP. > >> > >> >> >> > > > > > Please > >> > >> >> >> > > > > > > > notice the parallelism knob being a range with a > >> lower > >> > >> >> and upper > >> > >> >> >> > > > > bound. > >> > >> >> >> > > > > > > > Setting both the lower and upper bound to the > >> same > >> > >> value > >> > >> >> could give > >> > >> >> >> > > > > the > >> > >> >> >> > > > > > > > behavior you're describing at the cost of giving > >> up > >> > >> some > >> > >> >> properties > >> > >> >> >> > > > > > that > >> > >> >> >> > > > > > > AS > >> > >> >> >> > > > > > > > gives you (you'd be falling back to the > >> > >> >> DefaultScheduler's behavior). > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > when user upgrade job-vertx-parallelism . I want > >> to > >> > >> have > >> > >> >> an interface > >> > >> >> >> > > > > > to > >> > >> >> >> > > > > > > > > query the current update parallel execution > >> status, > >> > >> so > >> > >> >> that the > >> > >> >> >> > > > > user > >> > >> >> >> > > > > > or > >> > >> >> >> > > > > > > > > program can understand the current status > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > This is a misunderstanding. We're not > >> introducing the > >> > >> >> RESCALE > >> > >> >> >> > > > > endpoint. > >> > >> >> >> > > > > > > > This endpoint allows you to re-declare the > >> resources > >> > >> >> needed to run > >> > >> >> >> > > > > the > >> > >> >> >> > > > > > > job. > >> > >> >> >> > > > > > > > Once you reach the desired resources (you get > >> more > >> > >> >> resources than the > >> > >> >> >> > > > > > > lower > >> > >> >> >> > > > > > > > bound defines), your job will run. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > We can expose a similar endpoint to "resource > >> > >> >> requirements" to give > >> > >> >> >> > > > > you > >> > >> >> >> > > > > > > an > >> > >> >> >> > > > > > > > overview of the resources the vertices already > >> have. > >> > >> You > >> > >> >> can already > >> > >> >> >> > > > > > get > >> > >> >> >> > > > > > > > this from the REST API, so exposing this in yet > >> > >> another > >> > >> >> way should be > >> > >> >> >> > > > > > > > considered carefully. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > *@Matthias* > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > I'm wondering whether it makes sense to add some > >> kind > >> > >> of > >> > >> >> resource ID > >> > >> >> >> > > > > to > >> > >> >> >> > > > > > > the > >> > >> >> >> > > > > > > > > REST API. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > That's a good question. I want to think about > >> that and > >> > >> >> get back to > >> > >> >> >> > > > > the > >> > >> >> >> > > > > > > > question later. My main struggle when thinking > >> about > >> > >> this > >> > >> >> is, "if > >> > >> >> >> > > > > this > >> > >> >> >> > > > > > > > would be an idempotent POST endpoint," would it > >> be any > >> > >> >> different? > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > How often do we allow resource requirements to be > >> > >> changed? > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > There shall be no rate limiting on the FLINK > >> side. If > >> > >> >> this is > >> > >> >> >> > > > > something > >> > >> >> >> > > > > > > > your environment needs, you can achieve it on a > >> > >> different > >> > >> >> layer ("we > >> > >> >> >> > > > > > > can't > >> > >> >> >> > > > > > > > have FLINK to do everything"). > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Versioning the JobGraph in the JobGraphStore > >> rather > >> > >> than > >> > >> >> overwriting > >> > >> >> >> > > > > it > >> > >> >> >> > > > > > > > > might be an idea. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > This sounds interesting since it would be closer > >> to > >> > >> the > >> > >> >> JobGraph > >> > >> >> >> > > > > being > >> > >> >> >> > > > > > > > immutable. The main problem I see here is that > >> this > >> > >> would > >> > >> >> introduce a > >> > >> >> >> > > > > > > > BW-incompatible change so it might be a topic for > >> > >> >> follow-up FLIP. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > I'm just wondering whether we bundle two things > >> > >> together > >> > >> >> that are > >> > >> >> >> > > > > > > actually > >> > >> >> >> > > > > > > > > separate > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Yup, this is how we think about it as well. The > >> main > >> > >> >> question is, > >> > >> >> >> > > > > "who > >> > >> >> >> > > > > > > > should be responsible for bookkeeping 1) the > >> JobGraph > >> > >> and > >> > >> >> 2) the > >> > >> >> >> > > > > > > > JobResourceRequirements". The JobMaster would be > >> the > >> > >> >> right place for > >> > >> >> >> > > > > > > both, > >> > >> >> >> > > > > > > > but it's currently not the case, and we're > >> tightly > >> > >> >> coupling the > >> > >> >> >> > > > > > > dispatcher > >> > >> >> >> > > > > > > > with the JobMaster. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Initially, we tried to introduce a separate HA > >> > >> component > >> > >> >> in JobMaster > >> > >> >> >> > > > > > for > >> > >> >> >> > > > > > > > bookkeeping the JobResourceRequirements, but that > >> > >> proved > >> > >> >> to be a more > >> > >> >> >> > > > > > > > significant effort adding additional mess to the > >> > >> already > >> > >> >> messy HA > >> > >> >> >> > > > > > > > ecosystem. Another approach we've discussed was > >> > >> mutating > >> > >> >> the JobGraph > >> > >> >> >> > > > > > and > >> > >> >> >> > > > > > > > setting JRR into the JobGraph structure itself. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > The middle ground for keeping this effort > >> reasonably > >> > >> >> sized and not > >> > >> >> >> > > > > > > > violating "we want to keep JG immutable" too > >> much is > >> > >> >> keeping the > >> > >> >> >> > > > > > > > JobResourceRequirements separate as an internal > >> config > >> > >> >> option in > >> > >> >> >> > > > > > > JobGraph's > >> > >> >> >> > > > > > > > configuration. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > We ultimately need to rethink the tight coupling > >> of > >> > >> >> Dispatcher and > >> > >> >> >> > > > > > > > JobMaster, but it needs to be a separate effort. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > ...also considering the amount of data that can > >> be > >> > >> stored > >> > >> >> in a > >> > >> >> >> > > > > > > > > ConfigMap/ZooKeeper node if versioning the > >> resource > >> > >> >> requirement > >> > >> >> >> > > > > > change > >> > >> >> >> > > > > > > as > >> > >> >> >> > > > > > > > > proposed in my previous item is an option for > >> us. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > AFAIK we're only storing pointers to the S3 > >> objects > >> > >> in HA > >> > >> >> metadata, > >> > >> >> >> > > > > so > >> > >> >> >> > > > > > we > >> > >> >> >> > > > > > > > should be okay with having larger structures for > >> now. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Updating the JobGraphStore means adding more > >> requests > >> > >> to > >> > >> >> the HA > >> > >> >> >> > > > > backend > >> > >> >> >> > > > > > > > API. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > It's fine unless you intend to override the > >> resource > >> > >> >> requirements a > >> > >> >> >> > > > > few > >> > >> >> >> > > > > > > > times per second. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > *@Shammon* > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > How about adding some more information such as > >> vertex > >> > >> type > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Since it was intended as a "debug" endpoint, it > >> makes > >> > >> >> complete sense! > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > For sql jobs, we always use a unified > >> parallelism for > >> > >> >> most vertices. > >> > >> >> >> > > > > > Can > >> > >> >> >> > > > > > > > > we provide them with a more convenient setting > >> > >> method > >> > >> >> instead of > >> > >> >> >> > > > > each > >> > >> >> >> > > > > > > > one? > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > I completely feel with this. The main thoughts > >> when > >> > >> >> designing the API > >> > >> >> >> > > > > > > were: > >> > >> >> >> > > > > > > > - We want to keep it clean and easy to > >> understand. > >> > >> >> >> > > > > > > > - Global parallelism can be modeled using > >> per-vertex > >> > >> >> parallelism but > >> > >> >> >> > > > > > not > >> > >> >> >> > > > > > > > the other way around. > >> > >> >> >> > > > > > > > - The API will be used by external tooling > >> (operator, > >> > >> >> auto scaler). > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to > >> use > >> > >> the > >> > >> >> endpoint? I > >> > >> >> >> > > > > > > think > >> > >> >> >> > > > > > > > we can ultimately introduce a way of re-declaring > >> > >> >> "per-vertex > >> > >> >> >> > > > > > defaults," > >> > >> >> >> > > > > > > > but I'd like to understand the use case bit more > >> > >> first. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > *@Weijie* > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > What is the default value here (based on what > >> > >> >> configuration), or just > >> > >> >> >> > > > > > > > > infinite? > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Currently, for the lower bound, it's always one, > >> and > >> > >> for > >> > >> >> the upper > >> > >> >> >> > > > > > bound, > >> > >> >> >> > > > > > > > it's either parallelism (if defined) or the > >> > >> >> maxParallelism of the > >> > >> >> >> > > > > > vertex > >> > >> >> >> > > > > > > in > >> > >> >> >> > > > > > > > JobGraph. This question might be another signal > >> for > >> > >> >> making the > >> > >> >> >> > > > > defaults > >> > >> >> >> > > > > > > > explicit (see the answer to Shammon's question > >> above). > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Thanks, everyone, for your initial thoughts! > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > Best, > >> > >> >> >> > > > > > > > D. > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo < > >> > >> >> guoweijieres...@gmail.com > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > > wrote: > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > Thanks David for driving this. This is a very > >> > >> valuable > >> > >> >> work, > >> > >> >> >> > > > > > especially > >> > >> >> >> > > > > > > > for > >> > >> >> >> > > > > > > > > cloud native environment. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> How about adding some more information such > >> as > >> > >> >> vertex type > >> > >> >> >> > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of > >> `get > >> > >> jobs > >> > >> >> >> > > > > > > > > resource-requirements`? For users, only > >> vertex-id > >> > >> may > >> > >> >> be difficult > >> > >> >> >> > > > > to > >> > >> >> >> > > > > > > > > understand. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > +1 for this suggestion, including jobvertex's > >> name > >> > >> in > >> > >> >> the response > >> > >> >> >> > > > > > body > >> > >> >> >> > > > > > > > is > >> > >> >> >> > > > > > > > > more > >> > >> >> >> > > > > > > > > user-friendly. > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > I saw this sentence in FLIP: "Setting the upper > >> > >> bound > >> > >> >> to -1 will > >> > >> >> >> > > > > > reset > >> > >> >> >> > > > > > > > the > >> > >> >> >> > > > > > > > > value to the default setting." What is the > >> default > >> > >> >> value here > >> > >> >> >> > > > > (based > >> > >> >> >> > > > > > > on > >> > >> >> >> > > > > > > > > what configuration), or just infinite? > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > Best regards, > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > Weijie > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 > >> > >> 18:06写道: > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > Hi David > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > Thanks for initiating this discussion. I > >> think > >> > >> >> declaring job > >> > >> >> >> > > > > > resource > >> > >> >> >> > > > > > > > > > requirements by REST API is very valuable. I > >> just > >> > >> >> left some > >> > >> >> >> > > > > > comments > >> > >> >> >> > > > > > > as > >> > >> >> >> > > > > > > > > > followed > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > 1) How about adding some more information > >> such as > >> > >> >> vertex type > >> > >> >> >> > > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response > >> of `get > >> > >> >> jobs > >> > >> >> >> > > > > > > > > > resource-requirements`? For users, only > >> vertex-id > >> > >> may > >> > >> >> be > >> > >> >> >> > > > > difficult > >> > >> >> >> > > > > > to > >> > >> >> >> > > > > > > > > > understand. > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > 2) For sql jobs, we always use a unified > >> > >> parallelism > >> > >> >> for most > >> > >> >> >> > > > > > > vertices. > >> > >> >> >> > > > > > > > > Can > >> > >> >> >> > > > > > > > > > we provide them with a more convenient > >> setting > >> > >> method > >> > >> >> instead of > >> > >> >> >> > > > > > each > >> > >> >> >> > > > > > > > > one? > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > Best, > >> > >> >> >> > > > > > > > > > Shammon > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl > >> < > >> > >> >> >> > > > > > matthias.p...@aiven.io > >> > >> >> >> > > > > > > > > > .invalid> > >> > >> >> >> > > > > > > > > > wrote: > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > Thanks David for creating this FLIP. It > >> sounds > >> > >> >> promising and > >> > >> >> >> > > > > > useful > >> > >> >> >> > > > > > > > to > >> > >> >> >> > > > > > > > > > > have. Here are some thoughts from my side > >> (some > >> > >> of > >> > >> >> them might > >> > >> >> >> > > > > be > >> > >> >> >> > > > > > > > > rather a > >> > >> >> >> > > > > > > > > > > follow-up and not necessarily part of this > >> > >> FLIP): > >> > >> >> >> > > > > > > > > > > - I'm wondering whether it makes sense to > >> add > >> > >> some > >> > >> >> kind of > >> > >> >> >> > > > > > resource > >> > >> >> >> > > > > > > > ID > >> > >> >> >> > > > > > > > > to > >> > >> >> >> > > > > > > > > > > the REST API. This would give Flink a tool > >> to > >> > >> >> verify the PATCH > >> > >> >> >> > > > > > > > request > >> > >> >> >> > > > > > > > > of > >> > >> >> >> > > > > > > > > > > the external system in a compare-and-set > >> kind of > >> > >> >> manner. AFAIU, > >> > >> >> >> > > > > > the > >> > >> >> >> > > > > > > > > > process > >> > >> >> >> > > > > > > > > > > requires the external system to retrieve > >> the > >> > >> >> resource > >> > >> >> >> > > > > > requirements > >> > >> >> >> > > > > > > > > first > >> > >> >> >> > > > > > > > > > > (to retrieve the vertex IDs). A resource ID > >> > >> <ABC> > >> > >> >> would be sent > >> > >> >> >> > > > > > > along > >> > >> >> >> > > > > > > > > as > >> > >> >> >> > > > > > > > > > a > >> > >> >> >> > > > > > > > > > > unique identifier for the provided setup. > >> It's > >> > >> >> essentially the > >> > >> >> >> > > > > > > > version > >> > >> >> >> > > > > > > > > ID > >> > >> >> >> > > > > > > > > > > of the currently deployed resource > >> requirement > >> > >> >> configuration. > >> > >> >> >> > > > > > Flink > >> > >> >> >> > > > > > > > > > doesn't > >> > >> >> >> > > > > > > > > > > know whether the external system would use > >> the > >> > >> >> provided > >> > >> >> >> > > > > > information > >> > >> >> >> > > > > > > > in > >> > >> >> >> > > > > > > > > > some > >> > >> >> >> > > > > > > > > > > way to derive a new set of resource > >> requirements > >> > >> >> for this job. > >> > >> >> >> > > > > > The > >> > >> >> >> > > > > > > > > > > subsequent PATCH request with updated > >> resource > >> > >> >> requirements > >> > >> >> >> > > > > would > >> > >> >> >> > > > > > > > > include > >> > >> >> >> > > > > > > > > > > the previously retrieved resource ID > >> <ABC>. The > >> > >> >> PATCH call > >> > >> >> >> > > > > would > >> > >> >> >> > > > > > > fail > >> > >> >> >> > > > > > > > > if > >> > >> >> >> > > > > > > > > > > there was a concurrent PATCH call in > >> between > >> > >> >> indicating to the > >> > >> >> >> > > > > > > > external > >> > >> >> >> > > > > > > > > > > system that the resource requirements were > >> > >> >> concurrently > >> > >> >> >> > > > > updated. > >> > >> >> >> > > > > > > > > > > - How often do we allow resource > >> requirements > >> > >> to be > >> > >> >> changed? > >> > >> >> >> > > > > That > >> > >> >> >> > > > > > > > > > question > >> > >> >> >> > > > > > > > > > > might make my previous comment on the > >> resource > >> > >> ID > >> > >> >> obsolete > >> > >> >> >> > > > > > because > >> > >> >> >> > > > > > > we > >> > >> >> >> > > > > > > > > > could > >> > >> >> >> > > > > > > > > > > just make any PATCH call fail if there was > >> a > >> > >> >> resource > >> > >> >> >> > > > > requirement > >> > >> >> >> > > > > > > > > update > >> > >> >> >> > > > > > > > > > > within a certain time frame before the > >> request. > >> > >> But > >> > >> >> such a time > >> > >> >> >> > > > > > > > period > >> > >> >> >> > > > > > > > > is > >> > >> >> >> > > > > > > > > > > something we might want to make > >> configurable > >> > >> then, > >> > >> >> I guess. > >> > >> >> >> > > > > > > > > > > - Versioning the JobGraph in the > >> JobGraphStore > >> > >> >> rather than > >> > >> >> >> > > > > > > > overwriting > >> > >> >> >> > > > > > > > > it > >> > >> >> >> > > > > > > > > > > might be an idea. This would enable us to > >> > >> provide > >> > >> >> resource > >> > >> >> >> > > > > > > > requirement > >> > >> >> >> > > > > > > > > > > changes in the UI or through the REST API. > >> It is > >> > >> >> related to a > >> > >> >> >> > > > > > > problem > >> > >> >> >> > > > > > > > > > > around keeping track of the exception > >> history > >> > >> >> within the > >> > >> >> >> > > > > > > > > > AdaptiveScheduler > >> > >> >> >> > > > > > > > > > > and also having to consider multiple > >> versions > >> > >> of a > >> > >> >> JobGraph. > >> > >> >> >> > > > > But > >> > >> >> >> > > > > > > for > >> > >> >> >> > > > > > > > > that > >> > >> >> >> > > > > > > > > > > one, we use the ExecutionGraphInfoStore > >> right > >> > >> now. > >> > >> >> >> > > > > > > > > > > - Updating the JobGraph in the > >> JobGraphStore > >> > >> makes > >> > >> >> sense. I'm > >> > >> >> >> > > > > > just > >> > >> >> >> > > > > > > > > > > wondering whether we bundle two things > >> together > >> > >> >> that are > >> > >> >> >> > > > > actually > >> > >> >> >> > > > > > > > > > separate: > >> > >> >> >> > > > > > > > > > > The business logic and the execution > >> > >> configuration > >> > >> >> (the > >> > >> >> >> > > > > resource > >> > >> >> >> > > > > > > > > > > requirements). I'm aware that this is not a > >> > >> flaw of > >> > >> >> the current > >> > >> >> >> > > > > > > FLIP > >> > >> >> >> > > > > > > > > but > >> > >> >> >> > > > > > > > > > > rather something that was not necessary to > >> > >> address > >> > >> >> in the past > >> > >> >> >> > > > > > > > because > >> > >> >> >> > > > > > > > > > the > >> > >> >> >> > > > > > > > > > > JobGraph was kind of static. I don't > >> remember > >> > >> >> whether that was > >> > >> >> >> > > > > > > > already > >> > >> >> >> > > > > > > > > > > discussed while working on the > >> AdaptiveScheduler > >> > >> >> for FLIP-160 > >> > >> >> >> > > > > > [1]. > >> > >> >> >> > > > > > > > > Maybe, > >> > >> >> >> > > > > > > > > > > I'm missing some functionality here that > >> > >> requires > >> > >> >> us to have > >> > >> >> >> > > > > > > > everything > >> > >> >> >> > > > > > > > > > in > >> > >> >> >> > > > > > > > > > > one place. But it feels like updating the > >> entire > >> > >> >> JobGraph which > >> > >> >> >> > > > > > > could > >> > >> >> >> > > > > > > > > be > >> > >> >> >> > > > > > > > > > > actually a "config change" is not > >> reasonable. > >> > >> >> ...also > >> > >> >> >> > > > > considering > >> > >> >> >> > > > > > > the > >> > >> >> >> > > > > > > > > > > amount of data that can be stored in a > >> > >> >> ConfigMap/ZooKeeper node > >> > >> >> >> > > > > > if > >> > >> >> >> > > > > > > > > > > versioning the resource requirement change > >> as > >> > >> >> proposed in my > >> > >> >> >> > > > > > > previous > >> > >> >> >> > > > > > > > > > item > >> > >> >> >> > > > > > > > > > > is an option for us. > >> > >> >> >> > > > > > > > > > > - Updating the JobGraphStore means adding > >> more > >> > >> >> requests to the > >> > >> >> >> > > > > HA > >> > >> >> >> > > > > > > > > backend > >> > >> >> >> > > > > > > > > > > API. There were some concerns shared in the > >> > >> >> discussion thread > >> > >> >> >> > > > > [2] > >> > >> >> >> > > > > > > for > >> > >> >> >> > > > > > > > > > > FLIP-270 [3] on pressuring the k8s API > >> server in > >> > >> >> the past with > >> > >> >> >> > > > > > too > >> > >> >> >> > > > > > > > many > >> > >> >> >> > > > > > > > > > > calls. Eventhough, it's more likely to be > >> > >> caused by > >> > >> >> >> > > > > > checkpointing, > >> > >> >> >> > > > > > > I > >> > >> >> >> > > > > > > > > > still > >> > >> >> >> > > > > > > > > > > wanted to bring it up. We're working on a > >> > >> >> standardized > >> > >> >> >> > > > > > performance > >> > >> >> >> > > > > > > > test > >> > >> >> >> > > > > > > > > > to > >> > >> >> >> > > > > > > > > > > prepare going forward with FLIP-270 [3] > >> right > >> > >> now. > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > Best, > >> > >> >> >> > > > > > > > > > > Matthias > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > [1] > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> > >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler > >> > >> >> >> > > > > > > > > > > [2] > >> > >> >> >> > > > > > > > >> > >> >> https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj > >> > >> >> >> > > > > > > > > > > [3] > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> > >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam < > >> > >> >> jam.gz...@gmail.com > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > > wrote: > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > Hi David: > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > Thank you for drive this flip, which > >> helps > >> > >> less > >> > >> >> flink > >> > >> >> >> > > > > shutdown > >> > >> >> >> > > > > > > time > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > for this flip, I would like to make a few > >> > >> idea on > >> > >> >> share > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - when the number of "slots" is > >> > >> insufficient, > >> > >> >> can we can > >> > >> >> >> > > > > > stop > >> > >> >> >> > > > > > > > > users > >> > >> >> >> > > > > > > > > > > > rescaling or throw something to tell > >> user > >> > >> >> "less avaliable > >> > >> >> >> > > > > > > slots > >> > >> >> >> > > > > > > > to > >> > >> >> >> > > > > > > > > > > > upgrade, > >> > >> >> >> > > > > > > > > > > > please checkout your alivalbe slots" > >> ? Or > >> > >> we > >> > >> >> could have a > >> > >> >> >> > > > > > > > request > >> > >> >> >> > > > > > > > > > > > switch(true/false) to allow this > >> behavior > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - when user upgrade > >> job-vertx-parallelism > >> > >> . I > >> > >> >> want to have > >> > >> >> >> > > > > > an > >> > >> >> >> > > > > > > > > > > interface > >> > >> >> >> > > > > > > > > > > > to query the current update parallel > >> > >> execution > >> > >> >> status, so > >> > >> >> >> > > > > > that > >> > >> >> >> > > > > > > > the > >> > >> >> >> > > > > > > > > > > user > >> > >> >> >> > > > > > > > > > > > or > >> > >> >> >> > > > > > > > > > > > program can understand the current > >> status > >> > >> >> >> > > > > > > > > > > > - I want to have an interface to > >> query the > >> > >> >> current update > >> > >> >> >> > > > > > > > > > parallelism > >> > >> >> >> > > > > > > > > > > > execution status. This also helps > >> similar > >> > >> to > >> > >> >> *[1] Flink > >> > >> >> >> > > > > K8S > >> > >> >> >> > > > > > > > > > Operator* > >> > >> >> >> > > > > > > > > > > > management > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > { > >> > >> >> >> > > > > > > > > > > > status: Failed > >> > >> >> >> > > > > > > > > > > > reason: "less avaliable slots to > >> upgrade, > >> > >> >> please checkout > >> > >> >> >> > > > > > your > >> > >> >> >> > > > > > > > > > alivalbe > >> > >> >> >> > > > > > > > > > > > slots" > >> > >> >> >> > > > > > > > > > > > } > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - *Pending*: this job now is join the > >> > >> upgrade > >> > >> >> queue,it > >> > >> >> >> > > > > will > >> > >> >> >> > > > > > be > >> > >> >> >> > > > > > > > > > update > >> > >> >> >> > > > > > > > > > > > later > >> > >> >> >> > > > > > > > > > > > - *Rescaling*: job now is > >> rescaling,wait it > >> > >> >> finish > >> > >> >> >> > > > > > > > > > > > - *Finished*: finish do it > >> > >> >> >> > > > > > > > > > > > - *Failed* : something have wrong,so > >> this > >> > >> job > >> > >> >> is not > >> > >> >> >> > > > > > alivable > >> > >> >> >> > > > > > > > > > upgrade > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > I want to supplement my above content in > >> flip, > >> > >> >> what do you > >> > >> >> >> > > > > > think > >> > >> >> >> > > > > > > ? > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > 1. > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > >> > >> >> > >> > >> > >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/ > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > David Morávek <d...@apache.org> > >> 于2023年2月3日周五 > >> > >> >> 16:42写道: > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Hi everyone, > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > This FLIP [1] introduces a new REST > >> API for > >> > >> >> declaring > >> > >> >> >> > > > > > resource > >> > >> >> >> > > > > > > > > > > > requirements > >> > >> >> >> > > > > > > > > > > > > for the Adaptive Scheduler. There > >> seems to > >> > >> be a > >> > >> >> clear need > >> > >> >> >> > > > > > for > >> > >> >> >> > > > > > > > this > >> > >> >> >> > > > > > > > > > API > >> > >> >> >> > > > > > > > > > > > > based on the discussion on the > >> "Reworking > >> > >> the > >> > >> >> Rescale API" > >> > >> >> >> > > > > > [2] > >> > >> >> >> > > > > > > > > > thread. > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Before we get started, this work is > >> heavily > >> > >> >> based on the > >> > >> >> >> > > > > > > > prototype > >> > >> >> >> > > > > > > > > > [3] > >> > >> >> >> > > > > > > > > > > > > created by Till Rohrmann, and the FLIP > >> is > >> > >> being > >> > >> >> published > >> > >> >> >> > > > > > with > >> > >> >> >> > > > > > > > his > >> > >> >> >> > > > > > > > > > > > consent. > >> > >> >> >> > > > > > > > > > > > > Big shoutout to him! > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Last and not least, thanks to Chesnay > >> and > >> > >> Roman > >> > >> >> for the > >> > >> >> >> > > > > > initial > >> > >> >> >> > > > > > > > > > reviews > >> > >> >> >> > > > > > > > > > > > and > >> > >> >> >> > > > > > > > > > > > > discussions. > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > The best start would be watching a > >> short > >> > >> demo > >> > >> >> [4] that I've > >> > >> >> >> > > > > > > > > recorded, > >> > >> >> >> > > > > > > > > > > > which > >> > >> >> >> > > > > > > > > > > > > illustrates newly added capabilities > >> > >> (rescaling > >> > >> >> the running > >> > >> >> >> > > > > > > job, > >> > >> >> >> > > > > > > > > > > handing > >> > >> >> >> > > > > > > > > > > > > back resources to the RM, and session > >> > >> cluster > >> > >> >> support). > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > The intuition behind the FLIP is being > >> able > >> > >> to > >> > >> >> define > >> > >> >> >> > > > > > resource > >> > >> >> >> > > > > > > > > > > > requirements > >> > >> >> >> > > > > > > > > > > > > ("resource boundaries") externally > >> that the > >> > >> >> >> > > > > AdaptiveScheduler > >> > >> >> >> > > > > > > can > >> > >> >> >> > > > > > > > > > > > navigate > >> > >> >> >> > > > > > > > > > > > > within. This is a building block for > >> > >> >> higher-level efforts > >> > >> >> >> > > > > > such > >> > >> >> >> > > > > > > as > >> > >> >> >> > > > > > > > > an > >> > >> >> >> > > > > > > > > > > > > external Autoscaler. The natural > >> extension > >> > >> of > >> > >> >> this work > >> > >> >> >> > > > > would > >> > >> >> >> > > > > > > be > >> > >> >> >> > > > > > > > to > >> > >> >> >> > > > > > > > > > > allow > >> > >> >> >> > > > > > > > > > > > > to specify per-vertex ResourceProfiles. > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Looking forward to your thoughts; any > >> > >> feedback > >> > >> >> is > >> > >> >> >> > > > > > appreciated! > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > [1] > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> > >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management > >> > >> >> >> > > > > > > > > > > > > [2] > >> > >> >> >> > > > > > > > > > >> > >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 > >> > >> >> >> > > > > > > > > > > > > [3] > >> > >> >> https://github.com/tillrohrmann/flink/tree/autoscaling > >> > >> >> >> > > > > > > > > > > > > [4] > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > >> > >> >> > >> https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Best, > >> > >> >> >> > > > > > > > > > > > > D. > >> > >> >> >> > > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > -- > >> > >> >> >> > > > > > > > > > > > Best > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > ConradJam > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> > >> > >> > >> > > > >> > >