Thanks for the comment, Xintong.

I used to wonder if it was reasonable or worthwhile to introduce a
configuration like "table.exec.shuffle-mode" for DataStream API.
Narrow down the scope of effect sounds good to me.

Best,
Yangze Guo

On Tue, Jun 22, 2021 at 2:08 PM Xintong Song <tonysong...@gmail.com> wrote:
>
> I second Zhu and Till's opinion.
>
> Failing with an exception that also includes how to resolve the problem
> sounds better, in terms of making it explicit to users that pipelined edges
> are replaced with blocking edges.
>
> Concerning absence of knobs tuning the edge types, we can introduce a
> configuration option. Since currently the edge types are fixed based on the
> job execution mode and are not exposed to users, I'd suggest introducing a
> configuration option that only affects fine-grained resource management use
> cases. To be specific, we can have something like
> 'fine-grained.xxx.all-blocking'. The default value should be false, and we
> can suggest users to set it to true in the error message. When set to true,
> this should take effect only when fine-grained resource requirements are
> detected. Thus, it should not affect the default execution-mode based edge
> type strategy for non fine-grained use cases.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 21, 2021 at 8:59 PM Yangze Guo <karma...@gmail.com> wrote:
>
> > Thanks for the feedback, Till!
> >
> > Actually, we cannot give user any resolution for this issue as there
> > is no API for DataStream users to influence the edge types at the
> > moment. The edge types are currently fixed based on the jobs' mode
> > (batch or streaming).
> > a) I think it might not confuse the user a lot as the behavior has
> > never been documented or guaranteed to be unchanged.
> > b) Thanks for your illustration. I agree that add complexity can make
> > other feature development harder in the future. However, I think this
> > might not introduce much complexity. In this case, we construct an
> > all-edges-blocking job graph, which already exists since 1.11 and
> > should have been considered by the following features. I admit we
> > cannot assume the all-edges-blocking job graph will exist forever in
> > Flink, but AFAIK there is no seeable feature that will intend to
> > deprecate it.
> >
> > WDYT?
> >
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Jun 21, 2021 at 6:10 PM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > >
> > > I would be more in favor of what Zhu Zhu proposed to throw an exception
> > > with a meaningful and understandable explanation that also includes how
> > to
> > > resolve this problem. I do understand the reasoning behind automatically
> > > switching the edge types in order to make things easier to use but a)
> > this
> > > can also be confusing if the user does not expect this to happen and b)
> > it
> > > can add some complexity which makes other feature development harder in
> > the
> > > future because users might rely on it. An example of such a case I
> > stumbled
> > > upon rather recently is that we adjust the maximum parallelism wrt the
> > > given savepoint if it has not been explicitly configured. On the paper
> > this
> > > sounds like a good usability improvement, however, for the
> > > AdaptiveScheduler it posed a quite annoying complexity. If instead, we
> > said
> > > that we fail the job submission if the max parallelism does not equal the
> > > max parallelism of the savepoint, it would have been a lot easier.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Jun 21, 2021 at 9:36 AM Yangze Guo <karma...@gmail.com> wrote:
> > >
> > > > Thanks, I append it to the known limitations of this FLIP.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Jun 21, 2021 at 3:20 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > >
> > > > > Thanks for the quick response Yangze.
> > > > > The proposal sounds good to me.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Yangze Guo <karma...@gmail.com> 于2021年6月21日周一 下午3:01写道:
> > > > >>
> > > > >> Thanks for the comments, Zhu!
> > > > >>
> > > > >> Yes, it is a known limitation for fine-grained resource management.
> > We
> > > > >> also have filed this issue in FLINK-20865 when we proposed FLIP-156.
> > > > >>
> > > > >> As a first step, I agree that we can mark batch jobs with PIPELINED
> > > > >> edges as an invalid case for this feature. However, just throwing an
> > > > >> exception, in that case, might confuse users who do not understand
> > the
> > > > >> concept of pipeline region. Maybe we can force all the edges in this
> > > > >> scenario to BLOCKING in compiling stage and well document it. So
> > that,
> > > > >> common users will not be interrupted while the expert users can
> > > > >> understand the cost of that usage and make their decision. WDYT?
> > > > >>
> > > > >> Best,
> > > > >> Yangze Guo
> > > > >>
> > > > >> On Mon, Jun 21, 2021 at 2:24 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > >> >
> > > > >> > Thanks for proposing this @Yangze Guo and sorry for joining the
> > > > discussion so late.
> > > > >> > The proposal generally looks good to me. But I find one problem
> > that
> > > > batch job with PIPELINED edges might hang if enabling fine-grained
> > > > resources. see "Resource Deadlocks could still happen in certain Cases"
> > > > section in
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > >> > However, this problem may happen only in batch cases with
> > PIPELINED
> > > > edges, because
> > > > >> > 1. streaming jobs would always require all resource requirements
> > to
> > > > be fulfilled at the same time.
> > > > >> > 2. batch jobs without PIPELINED edges consist of multiple single
> > > > vertex regions and thus each slot can be individually used and returned
> > > > >> > So maybe in the first step, let's mark batch jobs with PIPELINED
> > > > edges as an invalid case for fine-grained resources and throw
> > exception for
> > > > it in early compiling stage?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Zhu
> > > > >> >
> > > > >> > Yangze Guo <karma...@gmail.com> 于2021年6月15日周二 下午4:57写道:
> > > > >> >>
> > > > >> >> Thanks for the supplement, Arvid and Yun. I've annotated these
> > two
> > > > >> >> points in the FLIP.
> > > > >> >> The vote is now started in [1].
> > > > >> >>
> > > > >> >> [1]
> > > >
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html
> > > > >> >>
> > > > >> >> Best,
> > > > >> >> Yangze Guo
> > > > >> >>
> > > > >> >> On Fri, Jun 11, 2021 at 2:50 PM Yun Gao
> > <yungao...@aliyun.com.invalid>
> > > > wrote:
> > > > >> >> >
> > > > >> >> > Hi,
> > > > >> >> >
> > > > >> >> > Very thanks @Yangze for bringing up this discuss. Overall +1
> > for
> > > > >> >> > exposing the fine-grained resource requirements in the
> > DataStream
> > > > API.
> > > > >> >> >
> > > > >> >> > One similar issue as Arvid has pointed out is that users may
> > also
> > > > creating
> > > > >> >> > different SlotSharingGroup objects, with different names but
> > with
> > > > different
> > > > >> >> > resources.  We might need to do some check internally. But We
> > > > could also
> > > > >> >> > leave that during the development of the actual PR.
> > > > >> >> >
> > > > >> >> > Best,
> > > > >> >> > Yun
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >  ------------------Original Mail ------------------
> > > > >> >> > Sender:Arvid Heise <ar...@apache.org>
> > > > >> >> > Send Date:Thu Jun 10 15:33:37 2021
> > > > >> >> > Recipients:dev <dev@flink.apache.org>
> > > > >> >> > Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained
> > > > Resource Requirements
> > > > >> >> > Hi Yangze,
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > Thanks for incorporating the ideas and sorry for missing the
> > > > builder part.
> > > > >> >> >
> > > > >> >> > My main idea is that SlotSharingGroup is immutable, such that
> > the
> > > > user
> > > > >> >> >
> > > > >> >> > doesn't do:
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > ssg = new SlotSharingGroup();
> > > > >> >> >
> > > > >> >> > ssg.setCpus(2);
> > > > >> >> >
> > > > >> >> > operator1.slotSharingGroup(ssg);
> > > > >> >> >
> > > > >> >> > ssg.setCpus(4);
> > > > >> >> >
> > > > >> >> > operator2.slotSharingGroup(ssg);
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > and wonders why both operators have the same CPU spec. But the
> > > > details can
> > > > >> >> >
> > > > >> >> > be fleshed out in the actual PR.
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > > Thanks all for the discussion. I've updated the FLIP
> > > > accordingly, the
> > > > >> >> >
> > > > >> >> > > key changes are:
> > > > >> >> >
> > > > >> >> > > - Introduce SlotSharingGroup instead of ResourceSpec which
> > > > contains
> > > > >> >> >
> > > > >> >> > > the resource spec of slot sharing group
> > > > >> >> >
> > > > >> >> > > - Introduce two interfaces for specifying the
> > SlotSharingGroup:
> > > > >> >> >
> > > > >> >> > > #slotSharingGroup(SlotSharingGroup) and
> > > > >> >> >
> > > > >> >> > >
> > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).
> > > > >> >> >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >> > > If there is no more feedback, I'd start a vote next week.
> > > > >> >> >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >> > > Best,
> > > > >> >> >
> > > > >> >> > > Yangze Guo
> > > > >> >> >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >> > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > Thanks for the valuable suggestion, Arvid.
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > 1) Yes, we can add a new SlotSharingGroup which includes
> > the
> > > > name and
> > > > >> >> >
> > > > >> >> > > > its resource. After that, we have two interfaces for
> > > > configuring the
> > > > >> >> >
> > > > >> >> > > > slot sharing group of an operator:
> > > > >> >> >
> > > > >> >> > > > - #slotSharingGroup(String name) // the resource of it can
> > be
> > > > >> >> >
> > > > >> >> > > > configured through
> > > > StreamExecutionEnvironment#registerSlotSharingGroup
> > > > >> >> >
> > > > >> >> > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly
> > > > configure the
> > > > >> >> >
> > > > >> >> > > resource
> > > > >> >> >
> > > > >> >> > > > And one interface to configure the resource of a SSG:
> > > > >> >> >
> > > > >> >> > > > -
> > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> > > > >> >> >
> > > > >> >> > > > We can also define the priority of the above two
> > approaches,
> > > > e.g. the
> > > > >> >> >
> > > > >> >> > > > resource registering in the StreamExecutionEnvironment will
> > > > always be
> > > > >> >> >
> > > > >> >> > > > respected when conflict. That would be well documented.
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > 2) Yes, I originally add this interface as a shortcut. It
> > seems
> > > > >> >> >
> > > > >> >> > > > unnecessary now. Will remove it.
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > 3) I don't think we need to expose the ExternalResource.
> > In the
> > > > >> >> >
> > > > >> >> > > > builder of SlotSharingGroup, we can introduce a
> > > > >> >> >
> > > > >> >> > > > #withExternalResource(String name, double value). Also,
> > this
> > > > interface
> > > > >> >> >
> > > > >> >> > > > needs to be annotated as evolving.
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would
> > be
> > > > good to
> > > > >> >> >
> > > > >> >> > > > elaborate on the Builder for the SlotSharingGroup.
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > WDYT?
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > Best,
> > > > >> >> >
> > > > >> >> > > > Yangze Guo
> > > > >> >> >
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > Hi Yangze,
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > I like the general approach to bind requirements to
> > > > slotsharing
> > > > >> >> >
> > > > >> >> > > groups. I
> > > > >> >> >
> > > > >> >> > > > > think the current approach is also flexible enough that a
> > > > user could
> > > > >> >> >
> > > > >> >> > > simply
> > > > >> >> >
> > > > >> >> > > > > use ParameterTool or similar to use config values and
> > wire
> > > > that with
> > > > >> >> >
> > > > >> >> > > their
> > > > >> >> >
> > > > >> >> > > > > slotgroups, such that different requirements can be
> > tested
> > > > without
> > > > >> >> >
> > > > >> >> > > > > recompilation. So I don't see an immediate need to
> > provide a
> > > > generic
> > > > >> >> >
> > > > >> >> > > > > solution for yaml configuration for now.
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > Looking at the programmatic interface though, I think we
> > > > could improve
> > > > >> >> >
> > > > >> >> > > by
> > > > >> >> >
> > > > >> >> > > > > quite a bit and I haven't seen these alternatives being
> > > > considered in
> > > > >> >> >
> > > > >> >> > > the
> > > > >> >> >
> > > > >> >> > > > > FLIP:
> > > > >> >> >
> > > > >> >> > > > > 1) Add new class SlotSharingGroup that incorporates all
> > > > ResourceSpec
> > > > >> >> >
> > > > >> >> > > > > properties. Instead of using group names, the user could
> > > > directly
> > > > >> >> >
> > > > >> >> > > configure
> > > > >> >> >
> > > > >> >> > > > > such an object.
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); //
> > > > name
> > > > >> >> >
> > > > >> >> > > > > could also be omitted and auto-generated
> > > > >> >> >
> > > > >> >> > > > > ssg1.setCPUCores(4);
> > > > >> >> >
> > > > >> >> > > > > ...
> > > > >> >> >
> > > > >> >> > > > > DataStream> grades =
> > > > >> >> >
> > > > >> >> > > > > GradeSource
> > > > >> >> >
> > > > >> >> > > > > .getSource(env, rate)
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > >
> > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > > > >> >> >
> > > > >> >> > > > > .slotSharingGroup(ssg1);
> > > > >> >> >
> > > > >> >> > > > > DataStream> salaries =
> > > > >> >> >
> > > > >> >> > > > > SalarySource.getSource(env, rate)
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > >
> > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > > > >> >> >
> > > > >> >> > > > > .slotSharingGroup(ssg2);
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > // run the actual window join program with the same slot
> > > > >> >> >
> > > > >> >> > > sharing
> > > > >> >> >
> > > > >> >> > > > > group as grades
> > > > >> >> >
> > > > >> >> > > > > DataStream> joinedStream =
> > > > >> >> >
> > > > >> >> > > > > runWindowJoin(grades, salaries,
> > > > >> >> >
> > > > >> >> > > > > windowSize).slotSharingGroup(ssg1);
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > Note that we could make it backward compatible by
> > changing
> > > > the proposed
> > > > >> >> >
> > > > >> >> > > > > StreamExecutionEnvironment#setSlotSharingGroupResource to
> > > > >> >> >
> > > > >> >> > > > >
> > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> > > > >> >> >
> > > > >> >> > > and
> > > > >> >> >
> > > > >> >> > > > > then use the string name for further reference.
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > 2) I'm also not sure on the StreamExecutionEnvironment#
> > > > >> >> >
> > > > >> >> > > > > setSlotSharingGroupResources. What's the benefit of the
> > Map
> > > > version
> > > > >> >> >
> > > > >> >> > > over
> > > > >> >> >
> > > > >> >> > > > > having the simple setter? Even if the user has a map
> > > > >> >> >
> > > > >> >> > > > > slotSharingGroupResources, he could simply do
> > > > >> >> >
> > > > >> >> > > > >
> > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > 3) Is defining the ExternalResource part of this FLIP? I
> > > > don't see a
> > > > >> >> >
> > > > >> >> > > > > Public* class yet. I'd be also fine to cut the scope of
> > this
> > > > FLIP and
> > > > >> >> >
> > > > >> >> > > > > remove it for now and annotate
> > ResourceSpec/SlotSharingGroup
> > > > evolving.
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > 4) We should probably use a builder pattern around
> > > > >> >> >
> > > > >> >> > > > > ResourceSpec/SlotSharingGroup as in the current
> > > > ResourceSpec. I don't
> > > > >> >> >
> > > > >> >> > > think
> > > > >> >> >
> > > > >> >> > > > > we need to fully specify that in the FLIP but it would be
> > > > good to at
> > > > >> >> >
> > > > >> >> > > least
> > > > >> >> >
> > > > >> >> > > > > say how they should be created by the user.
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
> > > > >> >> >
> > > > >> >> > > > >
> > > > >> >> >
> > > > >> >> > > > > > @Yang
> > > > >> >> >
> > > > >> >> > > > > > In short, the external resources will participate in
> > > > resource
> > > > >> >> >
> > > > >> >> > > > > > deduction and be logically ensured, but requesting an
> > > > external
> > > > >> >> >
> > > > >> >> > > > > > resource must still be done through config options with
> > > > the current
> > > > >> >> >
> > > > >> >> > > > > > default resource allocation strategy.
> > > > >> >> >
> > > > >> >> > > > > > In FLIP-56, we abstract the logic of resource
> > allocation
> > > > to the
> > > > >> >> >
> > > > >> >> > > > > > `ResourceAllocationStrategy`. Currently, with its
> > default
> > > > >> >> >
> > > > >> >> > > > > > implementation, ResourceManager would still allocate
> > TMs
> > > > with the
> > > > >> >> >
> > > > >> >> > > same
> > > > >> >> >
> > > > >> >> > > > > > resource spec and the external resources of it are
> > > > configured through
> > > > >> >> >
> > > > >> >> > > > > > the config option as well. So, in your case, you need
> > to
> > > > define the
> > > > >> >> >
> > > > >> >> > > > > > "external-resources" and
> > "external-resources.disk.amount".
> > > > Then, all
> > > > >> >> >
> > > > >> >> > > > > > the disk requirements defined in the SSG will be
> > logically
> > > > ensured,
> > > > >> >> >
> > > > >> >> > > as
> > > > >> >> >
> > > > >> >> > > > > > there is no slot level isolation. If the disk space of
> > a
> > > > task manager
> > > > >> >> >
> > > > >> >> > > > > > cannot fulfill the disk requirement, RM will allocate a
> > > > new one.
> > > > >> >> >
> > > > >> >> > > > > > In the future, we'd like to introduce a
> > > > `ResourceAllocationStrategy`
> > > > >> >> >
> > > > >> >> > > > > > which allocates heterogeneous TMs according to the
> > > > requirements.
> > > > >> >> >
> > > > >> >> > > Then,
> > > > >> >> >
> > > > >> >> > > > > > user only needs to define the driver of external
> > resources
> > > > when
> > > > >> >> >
> > > > >> >> > > > > > needed.
> > > > >> >> >
> > > > >> >> > > > > > Also, regarding the resource isolation, we may provide
> > a
> > > > fine-grained
> > > > >> >> >
> > > > >> >> > > > > > mode in which each slot can only fetch the information
> > of
> > > > external
> > > > >> >> >
> > > > >> >> > > > > > resources it requires in the future. But that is out of
> > > > the scope of
> > > > >> >> >
> > > > >> >> > > > > > this PR.
> > > > >> >> >
> > > > >> >> > > > > >
> > > > >> >> >
> > > > >> >> > > > > > Best,
> > > > >> >> >
> > > > >> >> > > > > > Yangze Guo
> > > > >> >> >
> > > > >> >> > > > > >
> > > > >> >> >
> > > > >> >> > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang
> > > > >> >> >
> > > > >> >> > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > Thanks @Yangze for preparing this FLIP.
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > I think this is a good start point for the community
> > > > users to have
> > > > >> >> >
> > > > >> >> > > a
> > > > >> >> >
> > > > >> >> > > > > > taste
> > > > >> >> >
> > > > >> >> > > > > > > on the fine-grained
> > > > >> >> >
> > > > >> >> > > > > > > resource management, which we all believe it could
> > > > improve the
> > > > >> >> >
> > > > >> >> > > Flink job
> > > > >> >> >
> > > > >> >> > > > > > > stability and
> > > > >> >> >
> > > > >> >> > > > > > > cluster utilization.
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > I have a simple question about the extended
> > resources.
> > > > It is
> > > > >> >> >
> > > > >> >> > > possible to
> > > > >> >> >
> > > > >> >> > > > > > > combine extended resources
> > > > >> >> >
> > > > >> >> > > > > > > with fine-grained resource management. Except for the
> > > > GPU, FPGA
> > > > >> >> >
> > > > >> >> > > and other
> > > > >> >> >
> > > > >> >> > > > > > > new computing devices,
> > > > >> >> >
> > > > >> >> > > > > > > maybe the disk resource is a more general use case.
> > For
> > > > example,
> > > > >> >> >
> > > > >> >> > > > > > different
> > > > >> >> >
> > > > >> >> > > > > > > SSG may have various
> > > > >> >> >
> > > > >> >> > > > > > > disk requirements based on the state. So we need to
> > > > allocate enough
> > > > >> >> >
> > > > >> >> > > > > > > ephemeral storage resource for every
> > > > >> >> >
> > > > >> >> > > > > > > TaskManager pod in Kubernetes deployment. Otherwise,
> > it
> > > > might be
> > > > >> >> >
> > > > >> >> > > evicted
> > > > >> >> >
> > > > >> >> > > > > > > due to running out of limits.
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > Best,
> > > > >> >> >
> > > > >> >> > > > > > > Yang
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > I think being able to specify fine grained resource
> > > > requirements
> > > > >> >> >
> > > > >> >> > > > > > without
> > > > >> >> >
> > > > >> >> > > > > > > > having to change the codes and recompile the job is
> > > > indeed a good
> > > > >> >> >
> > > > >> >> > > > > > idea. It
> > > > >> >> >
> > > > >> >> > > > > > > > definitely improves the usability.
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > However, this requires more careful designs, which
> > > > probably
> > > > >> >> >
> > > > >> >> > > deserves a
> > > > >> >> >
> > > > >> >> > > > > > > > separate thread. I'd be good to have that
> > discussion,
> > > > but maybe
> > > > >> >> >
> > > > >> >> > > not
> > > > >> >> >
> > > > >> >> > > > > > block
> > > > >> >> >
> > > > >> >> > > > > > > > this feature on that.
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > One idea concerning the configuration approach: As
> > > > Yangze said,
> > > > >> >> >
> > > > >> >> > > flink
> > > > >> >> >
> > > > >> >> > > > > > > > configuration options are supposed to take effect
> > at
> > > > cluster
> > > > >> >> >
> > > > >> >> > > level. For
> > > > >> >> >
> > > > >> >> > > > > > > > updating job level specifics that are not suitable
> > to
> > > > be
> > > > >> >> >
> > > > >> >> > > introduced as
> > > > >> >> >
> > > > >> >> > > > > > a
> > > > >> >> >
> > > > >> >> > > > > > > > config option, currently the only way is to pass
> > them
> > > > as program
> > > > >> >> >
> > > > >> >> > > > > > arguments.
> > > > >> >> >
> > > > >> >> > > > > > > > Would it make sense to introduce a general
> > approach for
> > > > >> >> >
> > > > >> >> > > overwriting
> > > > >> >> >
> > > > >> >> > > > > > such
> > > > >> >> >
> > > > >> >> > > > > > > > job specifics without re-compiling the job?
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > Thank you~
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > Xintong Song
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo
> > > > >> >> >
> > > > >> >> > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > @Wenlong
> > > > >> >> >
> > > > >> >> > > > > > > > > After another consideration, the config option
> > > > approach I
> > > > >> >> >
> > > > >> >> > > mentioned
> > > > >> >> >
> > > > >> >> > > > > > > > > above might not be appropriate. The resource
> > > > requirements for
> > > > >> >> >
> > > > >> >> > > SSG
> > > > >> >> >
> > > > >> >> > > > > > > > > should be a job level configuration and should
> > no be
> > > > set in the
> > > > >> >> >
> > > > >> >> > > > > > > > > flink-conf.
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > I think we can define a JSON format, which would
> > be
> > > > the
> > > > >> >> >
> > > > >> >> > > ResourceSpecs
> > > > >> >> >
> > > > >> >> > > > > > > > > mapped by the name of SSGs, for the resource
> > > > requirements of a
> > > > >> >> >
> > > > >> >> > > > > > > > > specific job. Then, we allow user to configure
> > the
> > > > file path
> > > > >> >> >
> > > > >> >> > > of that
> > > > >> >> >
> > > > >> >> > > > > > > > > JSON. The JSON will be only parsed in runtime,
> > which
> > > > allows
> > > > >> >> >
> > > > >> >> > > user to
> > > > >> >> >
> > > > >> >> > > > > > > > > tune it without re-compiling the job.
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > We can add another #setSlotSharingGroupResources
> > for
> > > > >> >> >
> > > > >> >> > > configuring the
> > > > >> >> >
> > > > >> >> > > > > > > > > file path of that JSON:
> > > > >> >> >
> > > > >> >> > > > > > > > > ```
> > > > >> >> >
> > > > >> >> > > > > > > > > /**
> > > > >> >> >
> > > > >> >> > > > > > > > > * Specify fine-grained resource requirements for
> > > > slot sharing
> > > > >> >> >
> > > > >> >> > > groups
> > > > >> >> >
> > > > >> >> > > > > > > > > with the given resource JSON file. The existing
> > > > resource
> > > > >> >> >
> > > > >> >> > > > > > > > > * requirement of the same slot sharing group
> > will be
> > > > replaced.
> > > > >> >> >
> > > > >> >> > > > > > > > > */
> > > > >> >> >
> > > > >> >> > > > > > > > > public StreamExecutionEnvironment
> > > > setSlotSharingGroupResources(
> > > > >> >> >
> > > > >> >> > > > > > > > > String pathToResourceJson);
> > > > >> >> >
> > > > >> >> > > > > > > > > ```
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > WDYT?
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > Best,
> > > > >> >> >
> > > > >> >> > > > > > > > > Yangze Guo
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo
> > > > >> >> > > >
> > > > >> >> >
> > > > >> >> > > > > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > >> >> >
> > > > >> >> > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > @Wenlong
> > > > >> >> >
> > > > >> >> > > > > > > > > > I think that is a good idea, adjust the
> > resource
> > > > without
> > > > >> >> >
> > > > >> >> > > > > > re-compiling
> > > > >> >> >
> > > > >> >> > > > > > > > > > the job will facilitate the tuning process.
> > > > >> >> >
> > > > >> >> > > > > > > > > > We can define a pattern
> > > > "slot-sharing-group.resource.{ssg
> > > > >> >> >
> > > > >> >> > > name}"
> > > > >> >> >
> > > > >> >> > > > > > > > > > (welcome any proposal for the prefix naming)
> > for
> > > > the
> > > > >> >> >
> > > > >> >> > > resource spec
> > > > >> >> >
> > > > >> >> > > > > > > > > > config of a slot sharing group. Then, user can
> > set
> > > > the
> > > > >> >> >
> > > > >> >> > > > > > ResourceSpec of
> > > > >> >> >
> > > > >> >> > > > > > > > > > SSG "ssg1" by adding
> > > > "slot-sharing-group.resource.ssg1:
> > > > >> >> >
> > > > >> >> > > {cpu: 1.0,
> > > > >> >> >
> > > > >> >> > > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > >> >> >
> > > > >> >> > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > Best,
> > > > >> >> >
> > > > >> >> > > > > > > > > > Yangze Guo
> > > > >> >> >
> > > > >> >> > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > > > >> >> >
> > > > >> >> > > > > > wenlong88....@gmail.com>
> > > > >> >> >
> > > > >> >> > > > > > > > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > Thanks Yangze for the flip, it is great for
> > > > users to be
> > > > >> >> >
> > > > >> >> > > able to
> > > > >> >> >
> > > > >> >> > > > > > > > > declare the
> > > > >> >> >
> > > > >> >> > > > > > > > > > > fine-grained resource requirements for the
> > job.
> > > > >> >> >
> > > > >> >> > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > I have one minor suggestion: can we support
> > > > setting
> > > > >> >> >
> > > > >> >> > > resource
> > > > >> >> >
> > > > >> >> > > > > > > > > requirements
> > > > >> >> >
> > > > >> >> > > > > > > > > > > by configuration? Currently most of the
> > config
> > > > options in
> > > > >> >> >
> > > > >> >> > > > > > execution
> > > > >> >> >
> > > > >> >> > > > > > > > > config
> > > > >> >> >
> > > > >> >> > > > > > > > > > > can be configured by configuration, and it is
> > > > very likely
> > > > >> >> >
> > > > >> >> > > that
> > > > >> >> >
> > > > >> >> > > > > > users
> > > > >> >> >
> > > > >> >> > > > > > > > > need
> > > > >> >> >
> > > > >> >> > > > > > > > > > > to adjust the resource according to the
> > > > performance of
> > > > >> >> >
> > > > >> >> > > their job
> > > > >> >> >
> > > > >> >> > > > > > > > during
> > > > >> >> >
> > > > >> >> > > > > > > > > > > debugging, Providing a configuration way will
> > > > make it more
> > > > >> >> >
> > > > >> >> > > > > > > > convenient.
> > > > >> >> >
> > > > >> >> > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > Bests,
> > > > >> >> >
> > > > >> >> > > > > > > > > > > Wenlong Lyu
> > > > >> >> >
> > > > >> >> > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <
> > > > >> >> >
> > > > >> >> > > tonysong...@gmail.com
> > > > >> >> >
> > > > >> >> > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > The proposed changes look good to me.
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > As you've mentioned in the implementation
> > > > plan, I
> > > > >> >> >
> > > > >> >> > > believe one
> > > > >> >> >
> > > > >> >> > > > > > of
> > > > >> >> >
> > > > >> >> > > > > > > > the
> > > > >> >> >
> > > > >> >> > > > > > > > > most
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > important tasks of this FLIP is to have the
> > > > feature well
> > > > >> >> >
> > > > >> >> > > > > > > > documented.
> > > > >> >> >
> > > > >> >> > > > > > > > > It
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > would be really nice if we can keep that in
> > > > mind and
> > > > >> >> >
> > > > >> >> > > start
> > > > >> >> >
> > > > >> >> > > > > > drafting
> > > > >> >> >
> > > > >> >> > > > > > > > > the
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > documentation early.
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > Thank you~
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > Xintong Song
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <
> > > > >> >> >
> > > > >> >> > > karma...@gmail.com>
> > > > >> >> >
> > > > >> >> > > > > > > > > wrote:
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > Hi, there,
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > We would like to start a discussion
> > thread on
> > > > >> >> >
> > > > >> >> > > "FLIP-169:
> > > > >> >> >
> > > > >> >> > > > > > > > DataStream
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > API for Fine-Grained Resource
> > > > Requirements"[1], where
> > > > >> >> >
> > > > >> >> > > we
> > > > >> >> >
> > > > >> >> > > > > > propose
> > > > >> >> >
> > > > >> >> > > > > > > > > the
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > DataStream API for specifying
> > fine-grained
> > > > resource
> > > > >> >> >
> > > > >> >> > > > > > requirements
> > > > >> >> >
> > > > >> >> > > > > > > > in
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > StreamExecutionEnvironment.
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > Please find more details in the FLIP wiki
> > > > document [1].
> > > > >> >> >
> > > > >> >> > > > > > Looking
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > forward to your feedback.
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > [1]
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > >
> > > > >> >> >
> > > > >> >> > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > Best,
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > > Yangze Guo
> > > > >> >> >
> > > > >> >> > > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > > > >
> > > > >> >> >
> > > > >> >> > > > > >
> > > > >> >> >
> > > > >> >> > >
> > > > >> >> >
> > > >
> >

Reply via email to