Thanks all for the discussion. I've updated the FLIP accordingly, the
key changes are:
- Introduce SlotSharingGroup instead of ResourceSpec which contains
the resource spec of slot sharing group
- Introduce two interfaces for specifying the SlotSharingGroup:
#slotSharingGroup(SlotSharingGroup) and
StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).

If there is no more feedback, I'd start a vote next week.

Best,
Yangze Guo

On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo <karma...@gmail.com> wrote:
>
> Thanks for the valuable suggestion, Arvid.
>
> 1) Yes, we can add a new SlotSharingGroup which includes the name and
> its resource. After that, we have two interfaces for configuring the
> slot sharing group of an operator:
> - #slotSharingGroup(String name)    // the resource of it can be
> configured through StreamExecutionEnvironment#registerSlotSharingGroup
> - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the resource
> And one interface to configure the resource of a SSG:
> - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> We can also define the priority of the above two approaches, e.g. the
> resource registering in the StreamExecutionEnvironment will always be
> respected when conflict. That would be well documented.
>
> 2) Yes, I originally add this interface as a shortcut. It seems
> unnecessary now. Will remove it.
>
> 3) I don't think we need to expose the ExternalResource. In the
> builder of SlotSharingGroup, we can introduce a
> #withExternalResource(String name, double value). Also, this interface
> needs to be annotated as evolving.
>
> 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
> elaborate on the Builder for the SlotSharingGroup.
>
> WDYT?
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > Hi Yangze,
> >
> > I like the general approach to bind requirements to slotsharing groups. I
> > think the current approach is also flexible enough that a user could simply
> > use ParameterTool or similar to use config values and wire that with their
> > slotgroups, such that different requirements can be tested without
> > recompilation. So I don't see an immediate need to provide a generic
> > solution for yaml configuration for now.
> >
> > Looking at the programmatic interface though, I think we could improve by
> > quite a bit and I haven't seen these alternatives being considered in the
> > FLIP:
> > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> > properties. Instead of using group names, the user could directly configure
> > such an object.
> >
> >         SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> > could also be omitted and auto-generated
> >         ssg1.setCPUCores(4);
> >         ...
> >         DataStream<Tuple2<String, Integer>> grades =
> >                 GradeSource
> >                         .getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> >                         .slotSharingGroup(ssg1);
> >         DataStream<Tuple2<String, Integer>> salaries =
> >                 SalarySource.getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> >                         .slotSharingGroup(ssg2);
> >
> >         // run the actual window join program with the same slot sharing
> > group as grades
> >         DataStream<Tuple3<String, Integer, Integer>> joinedStream =
> >                 runWindowJoin(grades, salaries,
> > windowSize).slotSharingGroup(ssg1);
> >
> > Note that we could make it backward compatible by changing the proposed
> > StreamExecutionEnvironment#setSlotSharingGroupResource to
> > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
> > then use the string name for further reference.
> >
> > 2) I'm also not sure on the StreamExecutionEnvironment#
> > setSlotSharingGroupResources. What's the benefit of the Map version over
> > having the simple setter? Even if the user has a map
> > slotSharingGroupResources, he could simply do
> > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> >
> > 3) Is defining the ExternalResource part of this FLIP? I don't see a
> > Public* class yet. I'd be also fine to cut the scope of this FLIP and
> > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
> >
> > 4) We should probably use a builder pattern around
> > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
> > we need to fully specify that in the FLIP but it would be good to at least
> > say how they should be created by the user.
> >
> >
> >
> > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <karma...@gmail.com> wrote:
> >
> > > @Yang
> > > In short, the external resources will participate in resource
> > > deduction and be logically ensured, but requesting an external
> > > resource must still be done through config options with the current
> > > default resource allocation strategy.
> > > In FLIP-56, we abstract the logic of resource allocation to the
> > > `ResourceAllocationStrategy`. Currently, with its default
> > > implementation, ResourceManager would still allocate TMs with the same
> > > resource spec and the external resources of it are configured through
> > > the config option as well. So, in your case, you need to define the
> > > "external-resources" and "external-resources.disk.amount". Then, all
> > > the disk requirements defined in the SSG will be logically ensured, as
> > > there is no slot level isolation. If the disk space of a task manager
> > > cannot fulfill the disk requirement, RM will allocate a new one.
> > > In the future, we'd like to introduce a `ResourceAllocationStrategy`
> > > which allocates heterogeneous TMs according to the requirements. Then,
> > > user only needs to define the driver of external resources when
> > > needed.
> > > Also, regarding the resource isolation, we may provide a fine-grained
> > > mode in which each slot can only fetch the information of external
> > > resources it requires in the future. But that is out of the scope of
> > > this PR.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <danrtsey...@gmail.com> wrote:
> > > >
> > > > Thanks @Yangze for preparing this FLIP.
> > > >
> > > > I think this is a good start point for the community users to have a
> > > taste
> > > > on the fine-grained
> > > > resource management, which we all believe it could improve the Flink job
> > > > stability and
> > > > cluster utilization.
> > > >
> > > > I have a simple question about the extended resources. It is possible to
> > > > combine extended resources
> > > > with fine-grained resource management. Except for the GPU, FPGA and 
> > > > other
> > > > new computing devices,
> > > > maybe the disk resource is a more general use case. For example,
> > > different
> > > > SSG may have various
> > > > disk requirements based on the state. So we need to allocate enough
> > > > ephemeral storage resource for every
> > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> > > > due to running out of limits.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > >
> > > > Xintong Song <tonysong...@gmail.com> 于2021年6月8日周二 下午1:47写道:
> > > >
> > > > > I think being able to specify fine grained resource requirements
> > > without
> > > > > having to change the codes and recompile the job is indeed a good
> > > idea. It
> > > > > definitely improves the usability.
> > > > >
> > > > > However, this requires more careful designs, which probably deserves a
> > > > > separate thread. I'd be good to have that discussion, but maybe not
> > > block
> > > > > this feature on that.
> > > > >
> > > > > One idea concerning the configuration approach: As Yangze said, flink
> > > > > configuration options are supposed to take effect at cluster level. 
> > > > > For
> > > > > updating job level specifics that are not suitable to be introduced as
> > > a
> > > > > config option, currently the only way is to pass them as program
> > > arguments.
> > > > > Would it make sense to introduce a general approach for overwriting
> > > such
> > > > > job specifics without re-compiling the job?
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <karma...@gmail.com> wrote:
> > > > >
> > > > > > @Wenlong
> > > > > > After another consideration, the config option approach I mentioned
> > > > > > above might not be appropriate. The resource requirements for SSG
> > > > > > should be a job level configuration and should no be set in the
> > > > > > flink-conf.
> > > > > >
> > > > > > I think we can define a JSON format, which would be the 
> > > > > > ResourceSpecs
> > > > > > mapped by the name of SSGs, for the resource requirements of a
> > > > > > specific job. Then, we allow user to configure the file path of that
> > > > > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > > > > tune it without re-compiling the job.
> > > > > >
> > > > > > We can add another #setSlotSharingGroupResources for configuring the
> > > > > > file path of that JSON:
> > > > > > ```
> > > > > > /**
> > > > > >  * Specify fine-grained resource requirements for slot sharing 
> > > > > > groups
> > > > > > with the given resource JSON file. The existing resource
> > > > > >  * requirement of the same slot sharing group will be replaced.
> > > > > >  */
> > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > > > >         String pathToResourceJson);
> > > > > > ```
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <karma...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > > > >
> > > > > > > @Wenlong
> > > > > > > I think that is a good idea, adjust the resource without
> > > re-compiling
> > > > > > > the job will facilitate the tuning process.
> > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > > > > config of a slot sharing group. Then, user can set the
> > > ResourceSpec of
> > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > > wenlong88....@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Yangze for the flip, it is great for users to be able to
> > > > > > declare the
> > > > > > > > fine-grained resource requirements for the job.
> > > > > > > >
> > > > > > > > I have one minor suggestion: can we support setting resource
> > > > > > requirements
> > > > > > > > by configuration? Currently most of the config options in
> > > execution
> > > > > > config
> > > > > > > > can be configured by configuration, and it is very likely that
> > > users
> > > > > > need
> > > > > > > > to adjust the resource according to the performance of their job
> > > > > during
> > > > > > > > debugging,  Providing a configuration way will make it more
> > > > > convenient.
> > > > > > > >
> > > > > > > > Bests,
> > > > > > > > Wenlong Lyu
> > > > > > > >
> > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <tonysong...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > > > > >
> > > > > > > > > The proposed changes look good to me.
> > > > > > > > >
> > > > > > > > > As you've mentioned in the implementation plan, I believe one
> > > of
> > > > > the
> > > > > > most
> > > > > > > > > important tasks of this FLIP is to have the feature well
> > > > > documented.
> > > > > > It
> > > > > > > > > would be really nice if we can keep that in mind and start
> > > drafting
> > > > > > the
> > > > > > > > > documentation early.
> > > > > > > > >
> > > > > > > > > Thank you~
> > > > > > > > >
> > > > > > > > > Xintong Song
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <karma...@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, there,
> > > > > > > > > >
> > > > > > > > > > We would like to start a discussion thread on "FLIP-169:
> > > > > DataStream
> > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where we
> > > propose
> > > > > > the
> > > > > > > > > > DataStream API for specifying fine-grained resource
> > > requirements
> > > > > in
> > > > > > > > > > StreamExecutionEnvironment.
> > > > > > > > > >
> > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > Looking
> > > > > > > > > > forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yangze Guo
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > > >
> > >

Reply via email to