FYI, I updated the FLIP accordingly. To sum up, Flink will throw an
exception and tell user to configure an internal
"fine-grained.shuffle-mode.all-blocking" to be true in this scenario.

Best,
Yangze Guo

On Tue, Jun 22, 2021 at 2:20 PM Yangze Guo <karma...@gmail.com> wrote:
>
> Thanks for the comment, Xintong.
>
> I used to wonder if it was reasonable or worthwhile to introduce a
> configuration like "table.exec.shuffle-mode" for DataStream API.
> Narrow down the scope of effect sounds good to me.
>
> Best,
> Yangze Guo
>
> On Tue, Jun 22, 2021 at 2:08 PM Xintong Song <tonysong...@gmail.com> wrote:
> >
> > I second Zhu and Till's opinion.
> >
> > Failing with an exception that also includes how to resolve the problem
> > sounds better, in terms of making it explicit to users that pipelined edges
> > are replaced with blocking edges.
> >
> > Concerning absence of knobs tuning the edge types, we can introduce a
> > configuration option. Since currently the edge types are fixed based on the
> > job execution mode and are not exposed to users, I'd suggest introducing a
> > configuration option that only affects fine-grained resource management use
> > cases. To be specific, we can have something like
> > 'fine-grained.xxx.all-blocking'. The default value should be false, and we
> > can suggest users to set it to true in the error message. When set to true,
> > this should take effect only when fine-grained resource requirements are
> > detected. Thus, it should not affect the default execution-mode based edge
> > type strategy for non fine-grained use cases.
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Jun 21, 2021 at 8:59 PM Yangze Guo <karma...@gmail.com> wrote:
> >
> > > Thanks for the feedback, Till!
> > >
> > > Actually, we cannot give user any resolution for this issue as there
> > > is no API for DataStream users to influence the edge types at the
> > > moment. The edge types are currently fixed based on the jobs' mode
> > > (batch or streaming).
> > > a) I think it might not confuse the user a lot as the behavior has
> > > never been documented or guaranteed to be unchanged.
> > > b) Thanks for your illustration. I agree that add complexity can make
> > > other feature development harder in the future. However, I think this
> > > might not introduce much complexity. In this case, we construct an
> > > all-edges-blocking job graph, which already exists since 1.11 and
> > > should have been considered by the following features. I admit we
> > > cannot assume the all-edges-blocking job graph will exist forever in
> > > Flink, but AFAIK there is no seeable feature that will intend to
> > > deprecate it.
> > >
> > > WDYT?
> > >
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Jun 21, 2021 at 6:10 PM Till Rohrmann <trohrm...@apache.org>
> > > wrote:
> > > >
> > > > I would be more in favor of what Zhu Zhu proposed to throw an exception
> > > > with a meaningful and understandable explanation that also includes how
> > > to
> > > > resolve this problem. I do understand the reasoning behind automatically
> > > > switching the edge types in order to make things easier to use but a)
> > > this
> > > > can also be confusing if the user does not expect this to happen and b)
> > > it
> > > > can add some complexity which makes other feature development harder in
> > > the
> > > > future because users might rely on it. An example of such a case I
> > > stumbled
> > > > upon rather recently is that we adjust the maximum parallelism wrt the
> > > > given savepoint if it has not been explicitly configured. On the paper
> > > this
> > > > sounds like a good usability improvement, however, for the
> > > > AdaptiveScheduler it posed a quite annoying complexity. If instead, we
> > > said
> > > > that we fail the job submission if the max parallelism does not equal 
> > > > the
> > > > max parallelism of the savepoint, it would have been a lot easier.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Jun 21, 2021 at 9:36 AM Yangze Guo <karma...@gmail.com> wrote:
> > > >
> > > > > Thanks, I append it to the known limitations of this FLIP.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Mon, Jun 21, 2021 at 3:20 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > > >
> > > > > > Thanks for the quick response Yangze.
> > > > > > The proposal sounds good to me.
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu
> > > > > >
> > > > > > Yangze Guo <karma...@gmail.com> 于2021年6月21日周一 下午3:01写道:
> > > > > >>
> > > > > >> Thanks for the comments, Zhu!
> > > > > >>
> > > > > >> Yes, it is a known limitation for fine-grained resource management.
> > > We
> > > > > >> also have filed this issue in FLINK-20865 when we proposed 
> > > > > >> FLIP-156.
> > > > > >>
> > > > > >> As a first step, I agree that we can mark batch jobs with PIPELINED
> > > > > >> edges as an invalid case for this feature. However, just throwing 
> > > > > >> an
> > > > > >> exception, in that case, might confuse users who do not understand
> > > the
> > > > > >> concept of pipeline region. Maybe we can force all the edges in 
> > > > > >> this
> > > > > >> scenario to BLOCKING in compiling stage and well document it. So
> > > that,
> > > > > >> common users will not be interrupted while the expert users can
> > > > > >> understand the cost of that usage and make their decision. WDYT?
> > > > > >>
> > > > > >> Best,
> > > > > >> Yangze Guo
> > > > > >>
> > > > > >> On Mon, Jun 21, 2021 at 2:24 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > > >> >
> > > > > >> > Thanks for proposing this @Yangze Guo and sorry for joining the
> > > > > discussion so late.
> > > > > >> > The proposal generally looks good to me. But I find one problem
> > > that
> > > > > batch job with PIPELINED edges might hang if enabling fine-grained
> > > > > resources. see "Resource Deadlocks could still happen in certain 
> > > > > Cases"
> > > > > section in
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > > >> > However, this problem may happen only in batch cases with
> > > PIPELINED
> > > > > edges, because
> > > > > >> > 1. streaming jobs would always require all resource requirements
> > > to
> > > > > be fulfilled at the same time.
> > > > > >> > 2. batch jobs without PIPELINED edges consist of multiple single
> > > > > vertex regions and thus each slot can be individually used and 
> > > > > returned
> > > > > >> > So maybe in the first step, let's mark batch jobs with PIPELINED
> > > > > edges as an invalid case for fine-grained resources and throw
> > > exception for
> > > > > it in early compiling stage?
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Zhu
> > > > > >> >
> > > > > >> > Yangze Guo <karma...@gmail.com> 于2021年6月15日周二 下午4:57写道:
> > > > > >> >>
> > > > > >> >> Thanks for the supplement, Arvid and Yun. I've annotated these
> > > two
> > > > > >> >> points in the FLIP.
> > > > > >> >> The vote is now started in [1].
> > > > > >> >>
> > > > > >> >> [1]
> > > > >
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html
> > > > > >> >>
> > > > > >> >> Best,
> > > > > >> >> Yangze Guo
> > > > > >> >>
> > > > > >> >> On Fri, Jun 11, 2021 at 2:50 PM Yun Gao
> > > <yungao...@aliyun.com.invalid>
> > > > > wrote:
> > > > > >> >> >
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Very thanks @Yangze for bringing up this discuss. Overall +1
> > > for
> > > > > >> >> > exposing the fine-grained resource requirements in the
> > > DataStream
> > > > > API.
> > > > > >> >> >
> > > > > >> >> > One similar issue as Arvid has pointed out is that users may
> > > also
> > > > > creating
> > > > > >> >> > different SlotSharingGroup objects, with different names but
> > > with
> > > > > different
> > > > > >> >> > resources.  We might need to do some check internally. But We
> > > > > could also
> > > > > >> >> > leave that during the development of the actual PR.
> > > > > >> >> >
> > > > > >> >> > Best,
> > > > > >> >> > Yun
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >  ------------------Original Mail ------------------
> > > > > >> >> > Sender:Arvid Heise <ar...@apache.org>
> > > > > >> >> > Send Date:Thu Jun 10 15:33:37 2021
> > > > > >> >> > Recipients:dev <dev@flink.apache.org>
> > > > > >> >> > Subject:Re: [DISCUSS] FLIP-169: DataStream API for 
> > > > > >> >> > Fine-Grained
> > > > > Resource Requirements
> > > > > >> >> > Hi Yangze,
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > Thanks for incorporating the ideas and sorry for missing the
> > > > > builder part.
> > > > > >> >> >
> > > > > >> >> > My main idea is that SlotSharingGroup is immutable, such that
> > > the
> > > > > user
> > > > > >> >> >
> > > > > >> >> > doesn't do:
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > ssg = new SlotSharingGroup();
> > > > > >> >> >
> > > > > >> >> > ssg.setCpus(2);
> > > > > >> >> >
> > > > > >> >> > operator1.slotSharingGroup(ssg);
> > > > > >> >> >
> > > > > >> >> > ssg.setCpus(4);
> > > > > >> >> >
> > > > > >> >> > operator2.slotSharingGroup(ssg);
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > and wonders why both operators have the same CPU spec. But the
> > > > > details can
> > > > > >> >> >
> > > > > >> >> > be fleshed out in the actual PR.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > > Thanks all for the discussion. I've updated the FLIP
> > > > > accordingly, the
> > > > > >> >> >
> > > > > >> >> > > key changes are:
> > > > > >> >> >
> > > > > >> >> > > - Introduce SlotSharingGroup instead of ResourceSpec which
> > > > > contains
> > > > > >> >> >
> > > > > >> >> > > the resource spec of slot sharing group
> > > > > >> >> >
> > > > > >> >> > > - Introduce two interfaces for specifying the
> > > SlotSharingGroup:
> > > > > >> >> >
> > > > > >> >> > > #slotSharingGroup(SlotSharingGroup) and
> > > > > >> >> >
> > > > > >> >> > >
> > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).
> > > > > >> >> >
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > > If there is no more feedback, I'd start a vote next week.
> > > > > >> >> >
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > > Best,
> > > > > >> >> >
> > > > > >> >> > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > Thanks for the valuable suggestion, Arvid.
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > 1) Yes, we can add a new SlotSharingGroup which includes
> > > the
> > > > > name and
> > > > > >> >> >
> > > > > >> >> > > > its resource. After that, we have two interfaces for
> > > > > configuring the
> > > > > >> >> >
> > > > > >> >> > > > slot sharing group of an operator:
> > > > > >> >> >
> > > > > >> >> > > > - #slotSharingGroup(String name) // the resource of it can
> > > be
> > > > > >> >> >
> > > > > >> >> > > > configured through
> > > > > StreamExecutionEnvironment#registerSlotSharingGroup
> > > > > >> >> >
> > > > > >> >> > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly
> > > > > configure the
> > > > > >> >> >
> > > > > >> >> > > resource
> > > > > >> >> >
> > > > > >> >> > > > And one interface to configure the resource of a SSG:
> > > > > >> >> >
> > > > > >> >> > > > -
> > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> > > > > >> >> >
> > > > > >> >> > > > We can also define the priority of the above two
> > > approaches,
> > > > > e.g. the
> > > > > >> >> >
> > > > > >> >> > > > resource registering in the StreamExecutionEnvironment 
> > > > > >> >> > > > will
> > > > > always be
> > > > > >> >> >
> > > > > >> >> > > > respected when conflict. That would be well documented.
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > 2) Yes, I originally add this interface as a shortcut. It
> > > seems
> > > > > >> >> >
> > > > > >> >> > > > unnecessary now. Will remove it.
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > 3) I don't think we need to expose the ExternalResource.
> > > In the
> > > > > >> >> >
> > > > > >> >> > > > builder of SlotSharingGroup, we can introduce a
> > > > > >> >> >
> > > > > >> >> > > > #withExternalResource(String name, double value). Also,
> > > this
> > > > > interface
> > > > > >> >> >
> > > > > >> >> > > > needs to be annotated as evolving.
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would
> > > be
> > > > > good to
> > > > > >> >> >
> > > > > >> >> > > > elaborate on the Builder for the SlotSharingGroup.
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > WDYT?
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > Hi Yangze,
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > I like the general approach to bind requirements to
> > > > > slotsharing
> > > > > >> >> >
> > > > > >> >> > > groups. I
> > > > > >> >> >
> > > > > >> >> > > > > think the current approach is also flexible enough that 
> > > > > >> >> > > > > a
> > > > > user could
> > > > > >> >> >
> > > > > >> >> > > simply
> > > > > >> >> >
> > > > > >> >> > > > > use ParameterTool or similar to use config values and
> > > wire
> > > > > that with
> > > > > >> >> >
> > > > > >> >> > > their
> > > > > >> >> >
> > > > > >> >> > > > > slotgroups, such that different requirements can be
> > > tested
> > > > > without
> > > > > >> >> >
> > > > > >> >> > > > > recompilation. So I don't see an immediate need to
> > > provide a
> > > > > generic
> > > > > >> >> >
> > > > > >> >> > > > > solution for yaml configuration for now.
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > Looking at the programmatic interface though, I think we
> > > > > could improve
> > > > > >> >> >
> > > > > >> >> > > by
> > > > > >> >> >
> > > > > >> >> > > > > quite a bit and I haven't seen these alternatives being
> > > > > considered in
> > > > > >> >> >
> > > > > >> >> > > the
> > > > > >> >> >
> > > > > >> >> > > > > FLIP:
> > > > > >> >> >
> > > > > >> >> > > > > 1) Add new class SlotSharingGroup that incorporates all
> > > > > ResourceSpec
> > > > > >> >> >
> > > > > >> >> > > > > properties. Instead of using group names, the user could
> > > > > directly
> > > > > >> >> >
> > > > > >> >> > > configure
> > > > > >> >> >
> > > > > >> >> > > > > such an object.
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); 
> > > > > >> >> > > > > //
> > > > > name
> > > > > >> >> >
> > > > > >> >> > > > > could also be omitted and auto-generated
> > > > > >> >> >
> > > > > >> >> > > > > ssg1.setCPUCores(4);
> > > > > >> >> >
> > > > > >> >> > > > > ...
> > > > > >> >> >
> > > > > >> >> > > > > DataStream> grades =
> > > > > >> >> >
> > > > > >> >> > > > > GradeSource
> > > > > >> >> >
> > > > > >> >> > > > > .getSource(env, rate)
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > > > > >> >> >
> > > > > >> >> > > > > .slotSharingGroup(ssg1);
> > > > > >> >> >
> > > > > >> >> > > > > DataStream> salaries =
> > > > > >> >> >
> > > > > >> >> > > > > SalarySource.getSource(env, rate)
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > > > > >> >> >
> > > > > >> >> > > > > .slotSharingGroup(ssg2);
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > // run the actual window join program with the same slot
> > > > > >> >> >
> > > > > >> >> > > sharing
> > > > > >> >> >
> > > > > >> >> > > > > group as grades
> > > > > >> >> >
> > > > > >> >> > > > > DataStream> joinedStream =
> > > > > >> >> >
> > > > > >> >> > > > > runWindowJoin(grades, salaries,
> > > > > >> >> >
> > > > > >> >> > > > > windowSize).slotSharingGroup(ssg1);
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > Note that we could make it backward compatible by
> > > changing
> > > > > the proposed
> > > > > >> >> >
> > > > > >> >> > > > > StreamExecutionEnvironment#setSlotSharingGroupResource 
> > > > > >> >> > > > > to
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> > > > > >> >> >
> > > > > >> >> > > and
> > > > > >> >> >
> > > > > >> >> > > > > then use the string name for further reference.
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > 2) I'm also not sure on the StreamExecutionEnvironment#
> > > > > >> >> >
> > > > > >> >> > > > > setSlotSharingGroupResources. What's the benefit of the
> > > Map
> > > > > version
> > > > > >> >> >
> > > > > >> >> > > over
> > > > > >> >> >
> > > > > >> >> > > > > having the simple setter? Even if the user has a map
> > > > > >> >> >
> > > > > >> >> > > > > slotSharingGroupResources, he could simply do
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > 3) Is defining the ExternalResource part of this FLIP? I
> > > > > don't see a
> > > > > >> >> >
> > > > > >> >> > > > > Public* class yet. I'd be also fine to cut the scope of
> > > this
> > > > > FLIP and
> > > > > >> >> >
> > > > > >> >> > > > > remove it for now and annotate
> > > ResourceSpec/SlotSharingGroup
> > > > > evolving.
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > 4) We should probably use a builder pattern around
> > > > > >> >> >
> > > > > >> >> > > > > ResourceSpec/SlotSharingGroup as in the current
> > > > > ResourceSpec. I don't
> > > > > >> >> >
> > > > > >> >> > > think
> > > > > >> >> >
> > > > > >> >> > > > > we need to fully specify that in the FLIP but it would 
> > > > > >> >> > > > > be
> > > > > good to at
> > > > > >> >> >
> > > > > >> >> > > least
> > > > > >> >> >
> > > > > >> >> > > > > say how they should be created by the user.
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
> > > > > >> >> >
> > > > > >> >> > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > @Yang
> > > > > >> >> >
> > > > > >> >> > > > > > In short, the external resources will participate in
> > > > > resource
> > > > > >> >> >
> > > > > >> >> > > > > > deduction and be logically ensured, but requesting an
> > > > > external
> > > > > >> >> >
> > > > > >> >> > > > > > resource must still be done through config options 
> > > > > >> >> > > > > > with
> > > > > the current
> > > > > >> >> >
> > > > > >> >> > > > > > default resource allocation strategy.
> > > > > >> >> >
> > > > > >> >> > > > > > In FLIP-56, we abstract the logic of resource
> > > allocation
> > > > > to the
> > > > > >> >> >
> > > > > >> >> > > > > > `ResourceAllocationStrategy`. Currently, with its
> > > default
> > > > > >> >> >
> > > > > >> >> > > > > > implementation, ResourceManager would still allocate
> > > TMs
> > > > > with the
> > > > > >> >> >
> > > > > >> >> > > same
> > > > > >> >> >
> > > > > >> >> > > > > > resource spec and the external resources of it are
> > > > > configured through
> > > > > >> >> >
> > > > > >> >> > > > > > the config option as well. So, in your case, you need
> > > to
> > > > > define the
> > > > > >> >> >
> > > > > >> >> > > > > > "external-resources" and
> > > "external-resources.disk.amount".
> > > > > Then, all
> > > > > >> >> >
> > > > > >> >> > > > > > the disk requirements defined in the SSG will be
> > > logically
> > > > > ensured,
> > > > > >> >> >
> > > > > >> >> > > as
> > > > > >> >> >
> > > > > >> >> > > > > > there is no slot level isolation. If the disk space of
> > > a
> > > > > task manager
> > > > > >> >> >
> > > > > >> >> > > > > > cannot fulfill the disk requirement, RM will allocate 
> > > > > >> >> > > > > > a
> > > > > new one.
> > > > > >> >> >
> > > > > >> >> > > > > > In the future, we'd like to introduce a
> > > > > `ResourceAllocationStrategy`
> > > > > >> >> >
> > > > > >> >> > > > > > which allocates heterogeneous TMs according to the
> > > > > requirements.
> > > > > >> >> >
> > > > > >> >> > > Then,
> > > > > >> >> >
> > > > > >> >> > > > > > user only needs to define the driver of external
> > > resources
> > > > > when
> > > > > >> >> >
> > > > > >> >> > > > > > needed.
> > > > > >> >> >
> > > > > >> >> > > > > > Also, regarding the resource isolation, we may provide
> > > a
> > > > > fine-grained
> > > > > >> >> >
> > > > > >> >> > > > > > mode in which each slot can only fetch the information
> > > of
> > > > > external
> > > > > >> >> >
> > > > > >> >> > > > > > resources it requires in the future. But that is out 
> > > > > >> >> > > > > > of
> > > > > the scope of
> > > > > >> >> >
> > > > > >> >> > > > > > this PR.
> > > > > >> >> >
> > > > > >> >> > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang
> > > > > >> >> >
> > > > > >> >> > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > Thanks @Yangze for preparing this FLIP.
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > I think this is a good start point for the community
> > > > > users to have
> > > > > >> >> >
> > > > > >> >> > > a
> > > > > >> >> >
> > > > > >> >> > > > > > taste
> > > > > >> >> >
> > > > > >> >> > > > > > > on the fine-grained
> > > > > >> >> >
> > > > > >> >> > > > > > > resource management, which we all believe it could
> > > > > improve the
> > > > > >> >> >
> > > > > >> >> > > Flink job
> > > > > >> >> >
> > > > > >> >> > > > > > > stability and
> > > > > >> >> >
> > > > > >> >> > > > > > > cluster utilization.
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > I have a simple question about the extended
> > > resources.
> > > > > It is
> > > > > >> >> >
> > > > > >> >> > > possible to
> > > > > >> >> >
> > > > > >> >> > > > > > > combine extended resources
> > > > > >> >> >
> > > > > >> >> > > > > > > with fine-grained resource management. Except for 
> > > > > >> >> > > > > > > the
> > > > > GPU, FPGA
> > > > > >> >> >
> > > > > >> >> > > and other
> > > > > >> >> >
> > > > > >> >> > > > > > > new computing devices,
> > > > > >> >> >
> > > > > >> >> > > > > > > maybe the disk resource is a more general use case.
> > > For
> > > > > example,
> > > > > >> >> >
> > > > > >> >> > > > > > different
> > > > > >> >> >
> > > > > >> >> > > > > > > SSG may have various
> > > > > >> >> >
> > > > > >> >> > > > > > > disk requirements based on the state. So we need to
> > > > > allocate enough
> > > > > >> >> >
> > > > > >> >> > > > > > > ephemeral storage resource for every
> > > > > >> >> >
> > > > > >> >> > > > > > > TaskManager pod in Kubernetes deployment. Otherwise,
> > > it
> > > > > might be
> > > > > >> >> >
> > > > > >> >> > > evicted
> > > > > >> >> >
> > > > > >> >> > > > > > > due to running out of limits.
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > > > > Yang
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > I think being able to specify fine grained 
> > > > > >> >> > > > > > > > resource
> > > > > requirements
> > > > > >> >> >
> > > > > >> >> > > > > > without
> > > > > >> >> >
> > > > > >> >> > > > > > > > having to change the codes and recompile the job 
> > > > > >> >> > > > > > > > is
> > > > > indeed a good
> > > > > >> >> >
> > > > > >> >> > > > > > idea. It
> > > > > >> >> >
> > > > > >> >> > > > > > > > definitely improves the usability.
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > However, this requires more careful designs, which
> > > > > probably
> > > > > >> >> >
> > > > > >> >> > > deserves a
> > > > > >> >> >
> > > > > >> >> > > > > > > > separate thread. I'd be good to have that
> > > discussion,
> > > > > but maybe
> > > > > >> >> >
> > > > > >> >> > > not
> > > > > >> >> >
> > > > > >> >> > > > > > block
> > > > > >> >> >
> > > > > >> >> > > > > > > > this feature on that.
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > One idea concerning the configuration approach: As
> > > > > Yangze said,
> > > > > >> >> >
> > > > > >> >> > > flink
> > > > > >> >> >
> > > > > >> >> > > > > > > > configuration options are supposed to take effect
> > > at
> > > > > cluster
> > > > > >> >> >
> > > > > >> >> > > level. For
> > > > > >> >> >
> > > > > >> >> > > > > > > > updating job level specifics that are not suitable
> > > to
> > > > > be
> > > > > >> >> >
> > > > > >> >> > > introduced as
> > > > > >> >> >
> > > > > >> >> > > > > > a
> > > > > >> >> >
> > > > > >> >> > > > > > > > config option, currently the only way is to pass
> > > them
> > > > > as program
> > > > > >> >> >
> > > > > >> >> > > > > > arguments.
> > > > > >> >> >
> > > > > >> >> > > > > > > > Would it make sense to introduce a general
> > > approach for
> > > > > >> >> >
> > > > > >> >> > > overwriting
> > > > > >> >> >
> > > > > >> >> > > > > > such
> > > > > >> >> >
> > > > > >> >> > > > > > > > job specifics without re-compiling the job?
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > Thank you~
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > Xintong Song
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > @Wenlong
> > > > > >> >> >
> > > > > >> >> > > > > > > > > After another consideration, the config option
> > > > > approach I
> > > > > >> >> >
> > > > > >> >> > > mentioned
> > > > > >> >> >
> > > > > >> >> > > > > > > > > above might not be appropriate. The resource
> > > > > requirements for
> > > > > >> >> >
> > > > > >> >> > > SSG
> > > > > >> >> >
> > > > > >> >> > > > > > > > > should be a job level configuration and should
> > > no be
> > > > > set in the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > flink-conf.
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > I think we can define a JSON format, which would
> > > be
> > > > > the
> > > > > >> >> >
> > > > > >> >> > > ResourceSpecs
> > > > > >> >> >
> > > > > >> >> > > > > > > > > mapped by the name of SSGs, for the resource
> > > > > requirements of a
> > > > > >> >> >
> > > > > >> >> > > > > > > > > specific job. Then, we allow user to configure
> > > the
> > > > > file path
> > > > > >> >> >
> > > > > >> >> > > of that
> > > > > >> >> >
> > > > > >> >> > > > > > > > > JSON. The JSON will be only parsed in runtime,
> > > which
> > > > > allows
> > > > > >> >> >
> > > > > >> >> > > user to
> > > > > >> >> >
> > > > > >> >> > > > > > > > > tune it without re-compiling the job.
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > We can add another #setSlotSharingGroupResources
> > > for
> > > > > >> >> >
> > > > > >> >> > > configuring the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > file path of that JSON:
> > > > > >> >> >
> > > > > >> >> > > > > > > > > ```
> > > > > >> >> >
> > > > > >> >> > > > > > > > > /**
> > > > > >> >> >
> > > > > >> >> > > > > > > > > * Specify fine-grained resource requirements for
> > > > > slot sharing
> > > > > >> >> >
> > > > > >> >> > > groups
> > > > > >> >> >
> > > > > >> >> > > > > > > > > with the given resource JSON file. The existing
> > > > > resource
> > > > > >> >> >
> > > > > >> >> > > > > > > > > * requirement of the same slot sharing group
> > > will be
> > > > > replaced.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > */
> > > > > >> >> >
> > > > > >> >> > > > > > > > > public StreamExecutionEnvironment
> > > > > setSlotSharingGroupResources(
> > > > > >> >> >
> > > > > >> >> > > > > > > > > String pathToResourceJson);
> > > > > >> >> >
> > > > > >> >> > > > > > > > > ```
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > WDYT?
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo
> > > > > >> >> > > >
> > > > > >> >> >
> > > > > >> >> > > > > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > > >> >> >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > @Wenlong
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > I think that is a good idea, adjust the
> > > resource
> > > > > without
> > > > > >> >> >
> > > > > >> >> > > > > > re-compiling
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > the job will facilitate the tuning process.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > We can define a pattern
> > > > > "slot-sharing-group.resource.{ssg
> > > > > >> >> >
> > > > > >> >> > > name}"
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > (welcome any proposal for the prefix naming)
> > > for
> > > > > the
> > > > > >> >> >
> > > > > >> >> > > resource spec
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > config of a slot sharing group. Then, user can
> > > set
> > > > > the
> > > > > >> >> >
> > > > > >> >> > > > > > ResourceSpec of
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > SSG "ssg1" by adding
> > > > > "slot-sharing-group.resource.ssg1:
> > > > > >> >> >
> > > > > >> >> > > {cpu: 1.0,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > > >> >> >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > > > > >> >> >
> > > > > >> >> > > > > > wenlong88....@gmail.com>
> > > > > >> >> >
> > > > > >> >> > > > > > > > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > Thanks Yangze for the flip, it is great for
> > > > > users to be
> > > > > >> >> >
> > > > > >> >> > > able to
> > > > > >> >> >
> > > > > >> >> > > > > > > > > declare the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > fine-grained resource requirements for the
> > > job.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > I have one minor suggestion: can we support
> > > > > setting
> > > > > >> >> >
> > > > > >> >> > > resource
> > > > > >> >> >
> > > > > >> >> > > > > > > > > requirements
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > by configuration? Currently most of the
> > > config
> > > > > options in
> > > > > >> >> >
> > > > > >> >> > > > > > execution
> > > > > >> >> >
> > > > > >> >> > > > > > > > > config
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > can be configured by configuration, and it 
> > > > > >> >> > > > > > > > > > > is
> > > > > very likely
> > > > > >> >> >
> > > > > >> >> > > that
> > > > > >> >> >
> > > > > >> >> > > > > > users
> > > > > >> >> >
> > > > > >> >> > > > > > > > > need
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > to adjust the resource according to the
> > > > > performance of
> > > > > >> >> >
> > > > > >> >> > > their job
> > > > > >> >> >
> > > > > >> >> > > > > > > > during
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > debugging, Providing a configuration way 
> > > > > >> >> > > > > > > > > > > will
> > > > > make it more
> > > > > >> >> >
> > > > > >> >> > > > > > > > convenient.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > Bests,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > Wenlong Lyu
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <
> > > > > >> >> >
> > > > > >> >> > > tonysong...@gmail.com
> > > > > >> >> >
> > > > > >> >> > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > The proposed changes look good to me.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > As you've mentioned in the implementation
> > > > > plan, I
> > > > > >> >> >
> > > > > >> >> > > believe one
> > > > > >> >> >
> > > > > >> >> > > > > > of
> > > > > >> >> >
> > > > > >> >> > > > > > > > the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > most
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > important tasks of this FLIP is to have 
> > > > > >> >> > > > > > > > > > > > the
> > > > > feature well
> > > > > >> >> >
> > > > > >> >> > > > > > > > documented.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > It
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > would be really nice if we can keep that 
> > > > > >> >> > > > > > > > > > > > in
> > > > > mind and
> > > > > >> >> >
> > > > > >> >> > > start
> > > > > >> >> >
> > > > > >> >> > > > > > drafting
> > > > > >> >> >
> > > > > >> >> > > > > > > > > the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > documentation early.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > Thank you~
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > Xintong Song
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo 
> > > > > >> >> > > > > > > > > > > > <
> > > > > >> >> >
> > > > > >> >> > > karma...@gmail.com>
> > > > > >> >> >
> > > > > >> >> > > > > > > > > wrote:
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > Hi, there,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > We would like to start a discussion
> > > thread on
> > > > > >> >> >
> > > > > >> >> > > "FLIP-169:
> > > > > >> >> >
> > > > > >> >> > > > > > > > DataStream
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > API for Fine-Grained Resource
> > > > > Requirements"[1], where
> > > > > >> >> >
> > > > > >> >> > > we
> > > > > >> >> >
> > > > > >> >> > > > > > propose
> > > > > >> >> >
> > > > > >> >> > > > > > > > > the
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > DataStream API for specifying
> > > fine-grained
> > > > > resource
> > > > > >> >> >
> > > > > >> >> > > > > > requirements
> > > > > >> >> >
> > > > > >> >> > > > > > > > in
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > StreamExecutionEnvironment.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > Please find more details in the FLIP 
> > > > > >> >> > > > > > > > > > > > > wiki
> > > > > document [1].
> > > > > >> >> >
> > > > > >> >> > > > > > Looking
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > forward to your feedback.
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > [1]
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > >
> > > > > >> >> >
> > > > > >> >> > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > Best,
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > > Yangze Guo
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > > > >
> > > > > >> >> >
> > > > > >> >> > > > > >
> > > > > >> >> >
> > > > > >> >> > >
> > > > > >> >> >
> > > > >
> > >

Reply via email to