Thanks for the comments, Zhu!

Yes, it is a known limitation for fine-grained resource management. We
also have filed this issue in FLINK-20865 when we proposed FLIP-156.

As a first step, I agree that we can mark batch jobs with PIPELINED
edges as an invalid case for this feature. However, just throwing an
exception, in that case, might confuse users who do not understand the
concept of pipeline region. Maybe we can force all the edges in this
scenario to BLOCKING in compiling stage and well document it. So that,
common users will not be interrupted while the expert users can
understand the cost of that usage and make their decision. WDYT?

Best,
Yangze Guo

On Mon, Jun 21, 2021 at 2:24 PM Zhu Zhu <reed...@gmail.com> wrote:
>
> Thanks for proposing this @Yangze Guo and sorry for joining the discussion so 
> late.
> The proposal generally looks good to me. But I find one problem that batch 
> job with PIPELINED edges might hang if enabling fine-grained resources. see 
> "Resource Deadlocks could still happen in certain Cases" section in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> However, this problem may happen only in batch cases with PIPELINED edges, 
> because
> 1. streaming jobs would always require all resource requirements to be 
> fulfilled at the same time.
> 2. batch jobs without PIPELINED edges consist of multiple single vertex 
> regions and thus each slot can be individually used and returned
> So maybe in the first step, let's mark batch jobs with PIPELINED edges as an 
> invalid case for fine-grained resources and throw exception for it in early 
> compiling stage?
>
> Thanks,
> Zhu
>
> Yangze Guo <karma...@gmail.com> 于2021年6月15日周二 下午4:57写道:
>>
>> Thanks for the supplement, Arvid and Yun. I've annotated these two
>> points in the FLIP.
>> The vote is now started in [1].
>>
>> [1] 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html
>>
>> Best,
>> Yangze Guo
>>
>> On Fri, Jun 11, 2021 at 2:50 PM Yun Gao <yungao...@aliyun.com.invalid> wrote:
>> >
>> > Hi,
>> >
>> > Very thanks @Yangze for bringing up this discuss. Overall +1 for
>> > exposing the fine-grained resource requirements in the DataStream API.
>> >
>> > One similar issue as Arvid has pointed out is that users may also creating
>> > different SlotSharingGroup objects, with different names but with different
>> > resources.  We might need to do some check internally. But We could also
>> > leave that during the development of the actual PR.
>> >
>> > Best,
>> > Yun
>> >
>> >
>> >
>> >  ------------------Original Mail ------------------
>> > Sender:Arvid Heise <ar...@apache.org>
>> > Send Date:Thu Jun 10 15:33:37 2021
>> > Recipients:dev <dev@flink.apache.org>
>> > Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource 
>> > Requirements
>> > Hi Yangze,
>> >
>> >
>> >
>> > Thanks for incorporating the ideas and sorry for missing the builder part.
>> >
>> > My main idea is that SlotSharingGroup is immutable, such that the user
>> >
>> > doesn't do:
>> >
>> >
>> >
>> > ssg = new SlotSharingGroup();
>> >
>> > ssg.setCpus(2);
>> >
>> > operator1.slotSharingGroup(ssg);
>> >
>> > ssg.setCpus(4);
>> >
>> > operator2.slotSharingGroup(ssg);
>> >
>> >
>> >
>> > and wonders why both operators have the same CPU spec. But the details can
>> >
>> > be fleshed out in the actual PR.
>> >
>> >
>> >
>> > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:
>> >
>> >
>> >
>> > > Thanks all for the discussion. I've updated the FLIP accordingly, the
>> >
>> > > key changes are:
>> >
>> > > - Introduce SlotSharingGroup instead of ResourceSpec which contains
>> >
>> > > the resource spec of slot sharing group
>> >
>> > > - Introduce two interfaces for specifying the SlotSharingGroup:
>> >
>> > > #slotSharingGroup(SlotSharingGroup) and
>> >
>> > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).
>> >
>> > >
>> >
>> > > If there is no more feedback, I'd start a vote next week.
>> >
>> > >
>> >
>> > > Best,
>> >
>> > > Yangze Guo
>> >
>> > >
>> >
>> > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:
>> >
>> > > >
>> >
>> > > > Thanks for the valuable suggestion, Arvid.
>> >
>> > > >
>> >
>> > > > 1) Yes, we can add a new SlotSharingGroup which includes the name and
>> >
>> > > > its resource. After that, we have two interfaces for configuring the
>> >
>> > > > slot sharing group of an operator:
>> >
>> > > > - #slotSharingGroup(String name) // the resource of it can be
>> >
>> > > > configured through StreamExecutionEnvironment#registerSlotSharingGroup
>> >
>> > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the
>> >
>> > > resource
>> >
>> > > > And one interface to configure the resource of a SSG:
>> >
>> > > > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
>> >
>> > > > We can also define the priority of the above two approaches, e.g. the
>> >
>> > > > resource registering in the StreamExecutionEnvironment will always be
>> >
>> > > > respected when conflict. That would be well documented.
>> >
>> > > >
>> >
>> > > > 2) Yes, I originally add this interface as a shortcut. It seems
>> >
>> > > > unnecessary now. Will remove it.
>> >
>> > > >
>> >
>> > > > 3) I don't think we need to expose the ExternalResource. In the
>> >
>> > > > builder of SlotSharingGroup, we can introduce a
>> >
>> > > > #withExternalResource(String name, double value). Also, this interface
>> >
>> > > > needs to be annotated as evolving.
>> >
>> > > >
>> >
>> > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
>> >
>> > > > elaborate on the Builder for the SlotSharingGroup.
>> >
>> > > >
>> >
>> > > > WDYT?
>> >
>> > > >
>> >
>> > > > Best,
>> >
>> > > > Yangze Guo
>> >
>> > > >
>> >
>> > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
>> >
>> > > > >
>> >
>> > > > > Hi Yangze,
>> >
>> > > > >
>> >
>> > > > > I like the general approach to bind requirements to slotsharing
>> >
>> > > groups. I
>> >
>> > > > > think the current approach is also flexible enough that a user could
>> >
>> > > simply
>> >
>> > > > > use ParameterTool or similar to use config values and wire that with
>> >
>> > > their
>> >
>> > > > > slotgroups, such that different requirements can be tested without
>> >
>> > > > > recompilation. So I don't see an immediate need to provide a generic
>> >
>> > > > > solution for yaml configuration for now.
>> >
>> > > > >
>> >
>> > > > > Looking at the programmatic interface though, I think we could 
>> > > > > improve
>> >
>> > > by
>> >
>> > > > > quite a bit and I haven't seen these alternatives being considered in
>> >
>> > > the
>> >
>> > > > > FLIP:
>> >
>> > > > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
>> >
>> > > > > properties. Instead of using group names, the user could directly
>> >
>> > > configure
>> >
>> > > > > such an object.
>> >
>> > > > >
>> >
>> > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
>> >
>> > > > > could also be omitted and auto-generated
>> >
>> > > > > ssg1.setCPUCores(4);
>> >
>> > > > > ...
>> >
>> > > > > DataStream> grades =
>> >
>> > > > > GradeSource
>> >
>> > > > > .getSource(env, rate)
>> >
>> > > > >
>> >
>> > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>> >
>> > > > > .slotSharingGroup(ssg1);
>> >
>> > > > > DataStream> salaries =
>> >
>> > > > > SalarySource.getSource(env, rate)
>> >
>> > > > >
>> >
>> > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>> >
>> > > > > .slotSharingGroup(ssg2);
>> >
>> > > > >
>> >
>> > > > > // run the actual window join program with the same slot
>> >
>> > > sharing
>> >
>> > > > > group as grades
>> >
>> > > > > DataStream> joinedStream =
>> >
>> > > > > runWindowJoin(grades, salaries,
>> >
>> > > > > windowSize).slotSharingGroup(ssg1);
>> >
>> > > > >
>> >
>> > > > > Note that we could make it backward compatible by changing the 
>> > > > > proposed
>> >
>> > > > > StreamExecutionEnvironment#setSlotSharingGroupResource to
>> >
>> > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
>> >
>> > > and
>> >
>> > > > > then use the string name for further reference.
>> >
>> > > > >
>> >
>> > > > > 2) I'm also not sure on the StreamExecutionEnvironment#
>> >
>> > > > > setSlotSharingGroupResources. What's the benefit of the Map version
>> >
>> > > over
>> >
>> > > > > having the simple setter? Even if the user has a map
>> >
>> > > > > slotSharingGroupResources, he could simply do
>> >
>> > > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
>> >
>> > > > >
>> >
>> > > > > 3) Is defining the ExternalResource part of this FLIP? I don't see a
>> >
>> > > > > Public* class yet. I'd be also fine to cut the scope of this FLIP and
>> >
>> > > > > remove it for now and annotate ResourceSpec/SlotSharingGroup 
>> > > > > evolving.
>> >
>> > > > >
>> >
>> > > > > 4) We should probably use a builder pattern around
>> >
>> > > > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't
>> >
>> > > think
>> >
>> > > > > we need to fully specify that in the FLIP but it would be good to at
>> >
>> > > least
>> >
>> > > > > say how they should be created by the user.
>> >
>> > > > >
>> >
>> > > > >
>> >
>> > > > >
>> >
>> > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
>> >
>> > > > >
>> >
>> > > > > > @Yang
>> >
>> > > > > > In short, the external resources will participate in resource
>> >
>> > > > > > deduction and be logically ensured, but requesting an external
>> >
>> > > > > > resource must still be done through config options with the current
>> >
>> > > > > > default resource allocation strategy.
>> >
>> > > > > > In FLIP-56, we abstract the logic of resource allocation to the
>> >
>> > > > > > `ResourceAllocationStrategy`. Currently, with its default
>> >
>> > > > > > implementation, ResourceManager would still allocate TMs with the
>> >
>> > > same
>> >
>> > > > > > resource spec and the external resources of it are configured 
>> > > > > > through
>> >
>> > > > > > the config option as well. So, in your case, you need to define the
>> >
>> > > > > > "external-resources" and "external-resources.disk.amount". Then, 
>> > > > > > all
>> >
>> > > > > > the disk requirements defined in the SSG will be logically ensured,
>> >
>> > > as
>> >
>> > > > > > there is no slot level isolation. If the disk space of a task 
>> > > > > > manager
>> >
>> > > > > > cannot fulfill the disk requirement, RM will allocate a new one.
>> >
>> > > > > > In the future, we'd like to introduce a 
>> > > > > > `ResourceAllocationStrategy`
>> >
>> > > > > > which allocates heterogeneous TMs according to the requirements.
>> >
>> > > Then,
>> >
>> > > > > > user only needs to define the driver of external resources when
>> >
>> > > > > > needed.
>> >
>> > > > > > Also, regarding the resource isolation, we may provide a 
>> > > > > > fine-grained
>> >
>> > > > > > mode in which each slot can only fetch the information of external
>> >
>> > > > > > resources it requires in the future. But that is out of the scope 
>> > > > > > of
>> >
>> > > > > > this PR.
>> >
>> > > > > >
>> >
>> > > > > > Best,
>> >
>> > > > > > Yangze Guo
>> >
>> > > > > >
>> >
>> > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang
>> >
>> > > wrote:
>> >
>> > > > > > >
>> >
>> > > > > > > Thanks @Yangze for preparing this FLIP.
>> >
>> > > > > > >
>> >
>> > > > > > > I think this is a good start point for the community users to 
>> > > > > > > have
>> >
>> > > a
>> >
>> > > > > > taste
>> >
>> > > > > > > on the fine-grained
>> >
>> > > > > > > resource management, which we all believe it could improve the
>> >
>> > > Flink job
>> >
>> > > > > > > stability and
>> >
>> > > > > > > cluster utilization.
>> >
>> > > > > > >
>> >
>> > > > > > > I have a simple question about the extended resources. It is
>> >
>> > > possible to
>> >
>> > > > > > > combine extended resources
>> >
>> > > > > > > with fine-grained resource management. Except for the GPU, FPGA
>> >
>> > > and other
>> >
>> > > > > > > new computing devices,
>> >
>> > > > > > > maybe the disk resource is a more general use case. For example,
>> >
>> > > > > > different
>> >
>> > > > > > > SSG may have various
>> >
>> > > > > > > disk requirements based on the state. So we need to allocate 
>> > > > > > > enough
>> >
>> > > > > > > ephemeral storage resource for every
>> >
>> > > > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be
>> >
>> > > evicted
>> >
>> > > > > > > due to running out of limits.
>> >
>> > > > > > >
>> >
>> > > > > > >
>> >
>> > > > > > > Best,
>> >
>> > > > > > > Yang
>> >
>> > > > > > >
>> >
>> > > > > > >
>> >
>> > > > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:
>> >
>> > > > > > >
>> >
>> > > > > > > > I think being able to specify fine grained resource 
>> > > > > > > > requirements
>> >
>> > > > > > without
>> >
>> > > > > > > > having to change the codes and recompile the job is indeed a 
>> > > > > > > > good
>> >
>> > > > > > idea. It
>> >
>> > > > > > > > definitely improves the usability.
>> >
>> > > > > > > >
>> >
>> > > > > > > > However, this requires more careful designs, which probably
>> >
>> > > deserves a
>> >
>> > > > > > > > separate thread. I'd be good to have that discussion, but maybe
>> >
>> > > not
>> >
>> > > > > > block
>> >
>> > > > > > > > this feature on that.
>> >
>> > > > > > > >
>> >
>> > > > > > > > One idea concerning the configuration approach: As Yangze said,
>> >
>> > > flink
>> >
>> > > > > > > > configuration options are supposed to take effect at cluster
>> >
>> > > level. For
>> >
>> > > > > > > > updating job level specifics that are not suitable to be
>> >
>> > > introduced as
>> >
>> > > > > > a
>> >
>> > > > > > > > config option, currently the only way is to pass them as 
>> > > > > > > > program
>> >
>> > > > > > arguments.
>> >
>> > > > > > > > Would it make sense to introduce a general approach for
>> >
>> > > overwriting
>> >
>> > > > > > such
>> >
>> > > > > > > > job specifics without re-compiling the job?
>> >
>> > > > > > > >
>> >
>> > > > > > > > Thank you~
>> >
>> > > > > > > >
>> >
>> > > > > > > > Xintong Song
>> >
>> > > > > > > >
>> >
>> > > > > > > >
>> >
>> > > > > > > >
>> >
>> > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo
>> >
>> > > wrote:
>> >
>> > > > > > > >
>> >
>> > > > > > > > > @Wenlong
>> >
>> > > > > > > > > After another consideration, the config option approach I
>> >
>> > > mentioned
>> >
>> > > > > > > > > above might not be appropriate. The resource requirements for
>> >
>> > > SSG
>> >
>> > > > > > > > > should be a job level configuration and should no be set in 
>> > > > > > > > > the
>> >
>> > > > > > > > > flink-conf.
>> >
>> > > > > > > > >
>> >
>> > > > > > > > > I think we can define a JSON format, which would be the
>> >
>> > > ResourceSpecs
>> >
>> > > > > > > > > mapped by the name of SSGs, for the resource requirements of 
>> > > > > > > > > a
>> >
>> > > > > > > > > specific job. Then, we allow user to configure the file path
>> >
>> > > of that
>> >
>> > > > > > > > > JSON. The JSON will be only parsed in runtime, which allows
>> >
>> > > user to
>> >
>> > > > > > > > > tune it without re-compiling the job.
>> >
>> > > > > > > > >
>> >
>> > > > > > > > > We can add another #setSlotSharingGroupResources for
>> >
>> > > configuring the
>> >
>> > > > > > > > > file path of that JSON:
>> >
>> > > > > > > > > ```
>> >
>> > > > > > > > > /**
>> >
>> > > > > > > > > * Specify fine-grained resource requirements for slot sharing
>> >
>> > > groups
>> >
>> > > > > > > > > with the given resource JSON file. The existing resource
>> >
>> > > > > > > > > * requirement of the same slot sharing group will be 
>> > > > > > > > > replaced.
>> >
>> > > > > > > > > */
>> >
>> > > > > > > > > public StreamExecutionEnvironment 
>> > > > > > > > > setSlotSharingGroupResources(
>> >
>> > > > > > > > > String pathToResourceJson);
>> >
>> > > > > > > > > ```
>> >
>> > > > > > > > >
>> >
>> > > > > > > > > WDYT?
>> >
>> > > > > > > > >
>> >
>> > > > > > > > > Best,
>> >
>> > > > > > > > > Yangze Guo
>> >
>> > > > > > > > >
>> >
>> > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo
>> > > >
>> >
>> > > > > > wrote:
>> >
>> > > > > > > > > >
>> >
>> > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
>> >
>> > > > > > > > > >
>> >
>> > > > > > > > > > @Wenlong
>> >
>> > > > > > > > > > I think that is a good idea, adjust the resource without
>> >
>> > > > > > re-compiling
>> >
>> > > > > > > > > > the job will facilitate the tuning process.
>> >
>> > > > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg
>> >
>> > > name}"
>> >
>> > > > > > > > > > (welcome any proposal for the prefix naming) for the
>> >
>> > > resource spec
>> >
>> > > > > > > > > > config of a slot sharing group. Then, user can set the
>> >
>> > > > > > ResourceSpec of
>> >
>> > > > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1:
>> >
>> > > {cpu: 1.0,
>> >
>> > > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
>> >
>> > > > > > > > > >
>> >
>> > > > > > > > > >
>> >
>> > > > > > > > > > Best,
>> >
>> > > > > > > > > > Yangze Guo
>> >
>> > > > > > > > > >
>> >
>> > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
>> >
>> > > > > > wenlong88....@gmail.com>
>> >
>> > > > > > > > > wrote:
>> >
>> > > > > > > > > > >
>> >
>> > > > > > > > > > > Thanks Yangze for the flip, it is great for users to be
>> >
>> > > able to
>> >
>> > > > > > > > > declare the
>> >
>> > > > > > > > > > > fine-grained resource requirements for the job.
>> >
>> > > > > > > > > > >
>> >
>> > > > > > > > > > > I have one minor suggestion: can we support setting
>> >
>> > > resource
>> >
>> > > > > > > > > requirements
>> >
>> > > > > > > > > > > by configuration? Currently most of the config options in
>> >
>> > > > > > execution
>> >
>> > > > > > > > > config
>> >
>> > > > > > > > > > > can be configured by configuration, and it is very likely
>> >
>> > > that
>> >
>> > > > > > users
>> >
>> > > > > > > > > need
>> >
>> > > > > > > > > > > to adjust the resource according to the performance of
>> >
>> > > their job
>> >
>> > > > > > > > during
>> >
>> > > > > > > > > > > debugging, Providing a configuration way will make it 
>> > > > > > > > > > > more
>> >
>> > > > > > > > convenient.
>> >
>> > > > > > > > > > >
>> >
>> > > > > > > > > > > Bests,
>> >
>> > > > > > > > > > > Wenlong Lyu
>> >
>> > > > > > > > > > >
>> >
>> > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <
>> >
>> > > tonysong...@gmail.com
>> >
>> > > > > > >
>> >
>> > > > > > > > > wrote:
>> >
>> > > > > > > > > > >
>> >
>> > > > > > > > > > > > Thanks Yangze for preparing the FLIP.
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > The proposed changes look good to me.
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > As you've mentioned in the implementation plan, I
>> >
>> > > believe one
>> >
>> > > > > > of
>> >
>> > > > > > > > the
>> >
>> > > > > > > > > most
>> >
>> > > > > > > > > > > > important tasks of this FLIP is to have the feature 
>> > > > > > > > > > > > well
>> >
>> > > > > > > > documented.
>> >
>> > > > > > > > > It
>> >
>> > > > > > > > > > > > would be really nice if we can keep that in mind and
>> >
>> > > start
>> >
>> > > > > > drafting
>> >
>> > > > > > > > > the
>> >
>> > > > > > > > > > > > documentation early.
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > Thank you~
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > Xintong Song
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <
>> >
>> > > karma...@gmail.com>
>> >
>> > > > > > > > > wrote:
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > > > > > > Hi, there,
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > > > We would like to start a discussion thread on
>> >
>> > > "FLIP-169:
>> >
>> > > > > > > > DataStream
>> >
>> > > > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where
>> >
>> > > we
>> >
>> > > > > > propose
>> >
>> > > > > > > > > the
>> >
>> > > > > > > > > > > > > DataStream API for specifying fine-grained resource
>> >
>> > > > > > requirements
>> >
>> > > > > > > > in
>> >
>> > > > > > > > > > > > > StreamExecutionEnvironment.
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > > > Please find more details in the FLIP wiki document 
>> > > > > > > > > > > > > [1].
>> >
>> > > > > > Looking
>> >
>> > > > > > > > > > > > > forward to your feedback.
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > > > [1]
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > >
>> >
>> > > > > > > >
>> >
>> > > > > >
>> >
>> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > > > Best,
>> >
>> > > > > > > > > > > > > Yangze Guo
>> >
>> > > > > > > > > > > > >
>> >
>> > > > > > > > > > > >
>> >
>> > > > > > > > >
>> >
>> > > > > > > >
>> >
>> > > > > >
>> >
>> > >
>> >

Reply via email to