Hi,

Very thanks @Yangze for bringing up this discuss. Overall +1 for 
exposing the fine-grained resource requirements in the DataStream API.

One similar issue as Arvid has pointed out is that users may also creating
different SlotSharingGroup objects, with different names but with different 
resources.  We might need to do some check internally. But We could also
leave that during the development of the actual PR.

Best,
Yun



 ------------------Original Mail ------------------
Sender:Arvid Heise <ar...@apache.org>
Send Date:Thu Jun 10 15:33:37 2021
Recipients:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource 
Requirements
Hi Yangze,



Thanks for incorporating the ideas and sorry for missing the builder part.

My main idea is that SlotSharingGroup is immutable, such that the user

doesn't do:



ssg = new SlotSharingGroup();

ssg.setCpus(2);

operator1.slotSharingGroup(ssg);

ssg.setCpus(4);

operator2.slotSharingGroup(ssg);



and wonders why both operators have the same CPU spec. But the details can

be fleshed out in the actual PR.



On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:



> Thanks all for the discussion. I've updated the FLIP accordingly, the

> key changes are:

> - Introduce SlotSharingGroup instead of ResourceSpec which contains

> the resource spec of slot sharing group

> - Introduce two interfaces for specifying the SlotSharingGroup:

> #slotSharingGroup(SlotSharingGroup) and

> StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).

>

> If there is no more feedback, I'd start a vote next week.

>

> Best,

> Yangze Guo

>

> On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:

> >

> > Thanks for the valuable suggestion, Arvid.

> >

> > 1) Yes, we can add a new SlotSharingGroup which includes the name and

> > its resource. After that, we have two interfaces for configuring the

> > slot sharing group of an operator:

> > - #slotSharingGroup(String name) // the resource of it can be

> > configured through StreamExecutionEnvironment#registerSlotSharingGroup

> > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the

> resource

> > And one interface to configure the resource of a SSG:

> > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)

> > We can also define the priority of the above two approaches, e.g. the

> > resource registering in the StreamExecutionEnvironment will always be

> > respected when conflict. That would be well documented.

> >

> > 2) Yes, I originally add this interface as a shortcut. It seems

> > unnecessary now. Will remove it.

> >

> > 3) I don't think we need to expose the ExternalResource. In the

> > builder of SlotSharingGroup, we can introduce a

> > #withExternalResource(String name, double value). Also, this interface

> > needs to be annotated as evolving.

> >

> > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to

> > elaborate on the Builder for the SlotSharingGroup.

> >

> > WDYT?

> >

> > Best,

> > Yangze Guo

> >

> > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:

> > >

> > > Hi Yangze,

> > >

> > > I like the general approach to bind requirements to slotsharing

> groups. I

> > > think the current approach is also flexible enough that a user could

> simply

> > > use ParameterTool or similar to use config values and wire that with

> their

> > > slotgroups, such that different requirements can be tested without

> > > recompilation. So I don't see an immediate need to provide a generic

> > > solution for yaml configuration for now.

> > >

> > > Looking at the programmatic interface though, I think we could improve

> by

> > > quite a bit and I haven't seen these alternatives being considered in

> the

> > > FLIP:

> > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec

> > > properties. Instead of using group names, the user could directly

> configure

> > > such an object.

> > >

> > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name

> > > could also be omitted and auto-generated

> > > ssg1.setCPUCores(4);

> > > ...

> > > DataStream> grades =

> > > GradeSource

> > > .getSource(env, rate)

> > >

> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())

> > > .slotSharingGroup(ssg1);

> > > DataStream> salaries =

> > > SalarySource.getSource(env, rate)

> > >

> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())

> > > .slotSharingGroup(ssg2);

> > >

> > > // run the actual window join program with the same slot

> sharing

> > > group as grades

> > > DataStream> joinedStream =

> > > runWindowJoin(grades, salaries,

> > > windowSize).slotSharingGroup(ssg1);

> > >

> > > Note that we could make it backward compatible by changing the proposed

> > > StreamExecutionEnvironment#setSlotSharingGroupResource to

> > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)

> and

> > > then use the string name for further reference.

> > >

> > > 2) I'm also not sure on the StreamExecutionEnvironment#

> > > setSlotSharingGroupResources. What's the benefit of the Map version

> over

> > > having the simple setter? Even if the user has a map

> > > slotSharingGroupResources, he could simply do

> > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);

> > >

> > > 3) Is defining the ExternalResource part of this FLIP? I don't see a

> > > Public* class yet. I'd be also fine to cut the scope of this FLIP and

> > > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.

> > >

> > > 4) We should probably use a builder pattern around

> > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't

> think

> > > we need to fully specify that in the FLIP but it would be good to at

> least

> > > say how they should be created by the user.

> > >

> > >

> > >

> > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:

> > >

> > > > @Yang

> > > > In short, the external resources will participate in resource

> > > > deduction and be logically ensured, but requesting an external

> > > > resource must still be done through config options with the current

> > > > default resource allocation strategy.

> > > > In FLIP-56, we abstract the logic of resource allocation to the

> > > > `ResourceAllocationStrategy`. Currently, with its default

> > > > implementation, ResourceManager would still allocate TMs with the

> same

> > > > resource spec and the external resources of it are configured through

> > > > the config option as well. So, in your case, you need to define the

> > > > "external-resources" and "external-resources.disk.amount". Then, all

> > > > the disk requirements defined in the SSG will be logically ensured,

> as

> > > > there is no slot level isolation. If the disk space of a task manager

> > > > cannot fulfill the disk requirement, RM will allocate a new one.

> > > > In the future, we'd like to introduce a `ResourceAllocationStrategy`

> > > > which allocates heterogeneous TMs according to the requirements.

> Then,

> > > > user only needs to define the driver of external resources when

> > > > needed.

> > > > Also, regarding the resource isolation, we may provide a fine-grained

> > > > mode in which each slot can only fetch the information of external

> > > > resources it requires in the future. But that is out of the scope of

> > > > this PR.

> > > >

> > > > Best,

> > > > Yangze Guo

> > > >

> > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang 

> wrote:

> > > > >

> > > > > Thanks @Yangze for preparing this FLIP.

> > > > >

> > > > > I think this is a good start point for the community users to have

> a

> > > > taste

> > > > > on the fine-grained

> > > > > resource management, which we all believe it could improve the

> Flink job

> > > > > stability and

> > > > > cluster utilization.

> > > > >

> > > > > I have a simple question about the extended resources. It is

> possible to

> > > > > combine extended resources

> > > > > with fine-grained resource management. Except for the GPU, FPGA

> and other

> > > > > new computing devices,

> > > > > maybe the disk resource is a more general use case. For example,

> > > > different

> > > > > SSG may have various

> > > > > disk requirements based on the state. So we need to allocate enough

> > > > > ephemeral storage resource for every

> > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be

> evicted

> > > > > due to running out of limits.

> > > > >

> > > > >

> > > > > Best,

> > > > > Yang

> > > > >

> > > > >

> > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:

> > > > >

> > > > > > I think being able to specify fine grained resource requirements

> > > > without

> > > > > > having to change the codes and recompile the job is indeed a good

> > > > idea. It

> > > > > > definitely improves the usability.

> > > > > >

> > > > > > However, this requires more careful designs, which probably

> deserves a

> > > > > > separate thread. I'd be good to have that discussion, but maybe

> not

> > > > block

> > > > > > this feature on that.

> > > > > >

> > > > > > One idea concerning the configuration approach: As Yangze said,

> flink

> > > > > > configuration options are supposed to take effect at cluster

> level. For

> > > > > > updating job level specifics that are not suitable to be

> introduced as

> > > > a

> > > > > > config option, currently the only way is to pass them as program

> > > > arguments.

> > > > > > Would it make sense to introduce a general approach for

> overwriting

> > > > such

> > > > > > job specifics without re-compiling the job?

> > > > > >

> > > > > > Thank you~

> > > > > >

> > > > > > Xintong Song

> > > > > >

> > > > > >

> > > > > >

> > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo 

> wrote:

> > > > > >

> > > > > > > @Wenlong

> > > > > > > After another consideration, the config option approach I

> mentioned

> > > > > > > above might not be appropriate. The resource requirements for

> SSG

> > > > > > > should be a job level configuration and should no be set in the

> > > > > > > flink-conf.

> > > > > > >

> > > > > > > I think we can define a JSON format, which would be the

> ResourceSpecs

> > > > > > > mapped by the name of SSGs, for the resource requirements of a

> > > > > > > specific job. Then, we allow user to configure the file path

> of that

> > > > > > > JSON. The JSON will be only parsed in runtime, which allows

> user to

> > > > > > > tune it without re-compiling the job.

> > > > > > >

> > > > > > > We can add another #setSlotSharingGroupResources for

> configuring the

> > > > > > > file path of that JSON:

> > > > > > > ```

> > > > > > > /**

> > > > > > > * Specify fine-grained resource requirements for slot sharing

> groups

> > > > > > > with the given resource JSON file. The existing resource

> > > > > > > * requirement of the same slot sharing group will be replaced.

> > > > > > > */

> > > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(

> > > > > > > String pathToResourceJson);

> > > > > > > ```

> > > > > > >

> > > > > > > WDYT?

> > > > > > >

> > > > > > > Best,

> > > > > > > Yangze Guo

> > > > > > >

> > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo 
> >

> > > > wrote:

> > > > > > > >

> > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!

> > > > > > > >

> > > > > > > > @Wenlong

> > > > > > > > I think that is a good idea, adjust the resource without

> > > > re-compiling

> > > > > > > > the job will facilitate the tuning process.

> > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg

> name}"

> > > > > > > > (welcome any proposal for the prefix naming) for the

> resource spec

> > > > > > > > config of a slot sharing group. Then, user can set the

> > > > ResourceSpec of

> > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1:

> {cpu: 1.0,

> > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?

> > > > > > > >

> > > > > > > >

> > > > > > > > Best,

> > > > > > > > Yangze Guo

> > > > > > > >

> > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <

> > > > wenlong88....@gmail.com>

> > > > > > > wrote:

> > > > > > > > >

> > > > > > > > > Thanks Yangze for the flip, it is great for users to be

> able to

> > > > > > > declare the

> > > > > > > > > fine-grained resource requirements for the job.

> > > > > > > > >

> > > > > > > > > I have one minor suggestion: can we support setting

> resource

> > > > > > > requirements

> > > > > > > > > by configuration? Currently most of the config options in

> > > > execution

> > > > > > > config

> > > > > > > > > can be configured by configuration, and it is very likely

> that

> > > > users

> > > > > > > need

> > > > > > > > > to adjust the resource according to the performance of

> their job

> > > > > > during

> > > > > > > > > debugging, Providing a configuration way will make it more

> > > > > > convenient.

> > > > > > > > >

> > > > > > > > > Bests,

> > > > > > > > > Wenlong Lyu

> > > > > > > > >

> > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <

> tonysong...@gmail.com

> > > > >

> > > > > > > wrote:

> > > > > > > > >

> > > > > > > > > > Thanks Yangze for preparing the FLIP.

> > > > > > > > > >

> > > > > > > > > > The proposed changes look good to me.

> > > > > > > > > >

> > > > > > > > > > As you've mentioned in the implementation plan, I

> believe one

> > > > of

> > > > > > the

> > > > > > > most

> > > > > > > > > > important tasks of this FLIP is to have the feature well

> > > > > > documented.

> > > > > > > It

> > > > > > > > > > would be really nice if we can keep that in mind and

> start

> > > > drafting

> > > > > > > the

> > > > > > > > > > documentation early.

> > > > > > > > > >

> > > > > > > > > > Thank you~

> > > > > > > > > >

> > > > > > > > > > Xintong Song

> > > > > > > > > >

> > > > > > > > > >

> > > > > > > > > >

> > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <

> karma...@gmail.com>

> > > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > > Hi, there,

> > > > > > > > > > >

> > > > > > > > > > > We would like to start a discussion thread on

> "FLIP-169:

> > > > > > DataStream

> > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where

> we

> > > > propose

> > > > > > > the

> > > > > > > > > > > DataStream API for specifying fine-grained resource

> > > > requirements

> > > > > > in

> > > > > > > > > > > StreamExecutionEnvironment.

> > > > > > > > > > >

> > > > > > > > > > > Please find more details in the FLIP wiki document [1].

> > > > Looking

> > > > > > > > > > > forward to your feedback.

> > > > > > > > > > >

> > > > > > > > > > > [1]

> > > > > > > > > > >

> > > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements

> > > > > > > > > > >

> > > > > > > > > > >

> > > > > > > > > > > Best,

> > > > > > > > > > > Yangze Guo

> > > > > > > > > > >

> > > > > > > > > >

> > > > > > >

> > > > > >

> > > >

>

Reply via email to