FYI, I updated the FLIP accordingly. To sum up, Flink will throw an exception and tell user to configure an internal "fine-grained.shuffle-mode.all-blocking" to be true in this scenario.
Best, Yangze Guo On Tue, Jun 22, 2021 at 2:20 PM Yangze Guo <karma...@gmail.com> wrote: > > Thanks for the comment, Xintong. > > I used to wonder if it was reasonable or worthwhile to introduce a > configuration like "table.exec.shuffle-mode" for DataStream API. > Narrow down the scope of effect sounds good to me. > > Best, > Yangze Guo > > On Tue, Jun 22, 2021 at 2:08 PM Xintong Song <tonysong...@gmail.com> wrote: > > > > I second Zhu and Till's opinion. > > > > Failing with an exception that also includes how to resolve the problem > > sounds better, in terms of making it explicit to users that pipelined edges > > are replaced with blocking edges. > > > > Concerning absence of knobs tuning the edge types, we can introduce a > > configuration option. Since currently the edge types are fixed based on the > > job execution mode and are not exposed to users, I'd suggest introducing a > > configuration option that only affects fine-grained resource management use > > cases. To be specific, we can have something like > > 'fine-grained.xxx.all-blocking'. The default value should be false, and we > > can suggest users to set it to true in the error message. When set to true, > > this should take effect only when fine-grained resource requirements are > > detected. Thus, it should not affect the default execution-mode based edge > > type strategy for non fine-grained use cases. > > > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Mon, Jun 21, 2021 at 8:59 PM Yangze Guo <karma...@gmail.com> wrote: > > > > > Thanks for the feedback, Till! > > > > > > Actually, we cannot give user any resolution for this issue as there > > > is no API for DataStream users to influence the edge types at the > > > moment. The edge types are currently fixed based on the jobs' mode > > > (batch or streaming). > > > a) I think it might not confuse the user a lot as the behavior has > > > never been documented or guaranteed to be unchanged. > > > b) Thanks for your illustration. I agree that add complexity can make > > > other feature development harder in the future. However, I think this > > > might not introduce much complexity. In this case, we construct an > > > all-edges-blocking job graph, which already exists since 1.11 and > > > should have been considered by the following features. I admit we > > > cannot assume the all-edges-blocking job graph will exist forever in > > > Flink, but AFAIK there is no seeable feature that will intend to > > > deprecate it. > > > > > > WDYT? > > > > > > > > > > > > Best, > > > Yangze Guo > > > > > > On Mon, Jun 21, 2021 at 6:10 PM Till Rohrmann <trohrm...@apache.org> > > > wrote: > > > > > > > > I would be more in favor of what Zhu Zhu proposed to throw an exception > > > > with a meaningful and understandable explanation that also includes how > > > to > > > > resolve this problem. I do understand the reasoning behind automatically > > > > switching the edge types in order to make things easier to use but a) > > > this > > > > can also be confusing if the user does not expect this to happen and b) > > > it > > > > can add some complexity which makes other feature development harder in > > > the > > > > future because users might rely on it. An example of such a case I > > > stumbled > > > > upon rather recently is that we adjust the maximum parallelism wrt the > > > > given savepoint if it has not been explicitly configured. On the paper > > > this > > > > sounds like a good usability improvement, however, for the > > > > AdaptiveScheduler it posed a quite annoying complexity. If instead, we > > > said > > > > that we fail the job submission if the max parallelism does not equal > > > > the > > > > max parallelism of the savepoint, it would have been a lot easier. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Mon, Jun 21, 2021 at 9:36 AM Yangze Guo <karma...@gmail.com> wrote: > > > > > > > > > Thanks, I append it to the known limitations of this FLIP. > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > On Mon, Jun 21, 2021 at 3:20 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > > > > > Thanks for the quick response Yangze. > > > > > > The proposal sounds good to me. > > > > > > > > > > > > Thanks, > > > > > > Zhu > > > > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2021年6月21日周一 下午3:01写道: > > > > > >> > > > > > >> Thanks for the comments, Zhu! > > > > > >> > > > > > >> Yes, it is a known limitation for fine-grained resource management. > > > We > > > > > >> also have filed this issue in FLINK-20865 when we proposed > > > > > >> FLIP-156. > > > > > >> > > > > > >> As a first step, I agree that we can mark batch jobs with PIPELINED > > > > > >> edges as an invalid case for this feature. However, just throwing > > > > > >> an > > > > > >> exception, in that case, might confuse users who do not understand > > > the > > > > > >> concept of pipeline region. Maybe we can force all the edges in > > > > > >> this > > > > > >> scenario to BLOCKING in compiling stage and well document it. So > > > that, > > > > > >> common users will not be interrupted while the expert users can > > > > > >> understand the cost of that usage and make their decision. WDYT? > > > > > >> > > > > > >> Best, > > > > > >> Yangze Guo > > > > > >> > > > > > >> On Mon, Jun 21, 2021 at 2:24 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > >> > > > > > > >> > Thanks for proposing this @Yangze Guo and sorry for joining the > > > > > discussion so late. > > > > > >> > The proposal generally looks good to me. But I find one problem > > > that > > > > > batch job with PIPELINED edges might hang if enabling fine-grained > > > > > resources. see "Resource Deadlocks could still happen in certain > > > > > Cases" > > > > > section in > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling > > > > > >> > However, this problem may happen only in batch cases with > > > PIPELINED > > > > > edges, because > > > > > >> > 1. streaming jobs would always require all resource requirements > > > to > > > > > be fulfilled at the same time. > > > > > >> > 2. batch jobs without PIPELINED edges consist of multiple single > > > > > vertex regions and thus each slot can be individually used and > > > > > returned > > > > > >> > So maybe in the first step, let's mark batch jobs with PIPELINED > > > > > edges as an invalid case for fine-grained resources and throw > > > exception for > > > > > it in early compiling stage? > > > > > >> > > > > > > >> > Thanks, > > > > > >> > Zhu > > > > > >> > > > > > > >> > Yangze Guo <karma...@gmail.com> 于2021年6月15日周二 下午4:57写道: > > > > > >> >> > > > > > >> >> Thanks for the supplement, Arvid and Yun. I've annotated these > > > two > > > > > >> >> points in the FLIP. > > > > > >> >> The vote is now started in [1]. > > > > > >> >> > > > > > >> >> [1] > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html > > > > > >> >> > > > > > >> >> Best, > > > > > >> >> Yangze Guo > > > > > >> >> > > > > > >> >> On Fri, Jun 11, 2021 at 2:50 PM Yun Gao > > > <yungao...@aliyun.com.invalid> > > > > > wrote: > > > > > >> >> > > > > > > >> >> > Hi, > > > > > >> >> > > > > > > >> >> > Very thanks @Yangze for bringing up this discuss. Overall +1 > > > for > > > > > >> >> > exposing the fine-grained resource requirements in the > > > DataStream > > > > > API. > > > > > >> >> > > > > > > >> >> > One similar issue as Arvid has pointed out is that users may > > > also > > > > > creating > > > > > >> >> > different SlotSharingGroup objects, with different names but > > > with > > > > > different > > > > > >> >> > resources. We might need to do some check internally. But We > > > > > could also > > > > > >> >> > leave that during the development of the actual PR. > > > > > >> >> > > > > > > >> >> > Best, > > > > > >> >> > Yun > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > ------------------Original Mail ------------------ > > > > > >> >> > Sender:Arvid Heise <ar...@apache.org> > > > > > >> >> > Send Date:Thu Jun 10 15:33:37 2021 > > > > > >> >> > Recipients:dev <dev@flink.apache.org> > > > > > >> >> > Subject:Re: [DISCUSS] FLIP-169: DataStream API for > > > > > >> >> > Fine-Grained > > > > > Resource Requirements > > > > > >> >> > Hi Yangze, > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > Thanks for incorporating the ideas and sorry for missing the > > > > > builder part. > > > > > >> >> > > > > > > >> >> > My main idea is that SlotSharingGroup is immutable, such that > > > the > > > > > user > > > > > >> >> > > > > > > >> >> > doesn't do: > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > ssg = new SlotSharingGroup(); > > > > > >> >> > > > > > > >> >> > ssg.setCpus(2); > > > > > >> >> > > > > > > >> >> > operator1.slotSharingGroup(ssg); > > > > > >> >> > > > > > > >> >> > ssg.setCpus(4); > > > > > >> >> > > > > > > >> >> > operator2.slotSharingGroup(ssg); > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > and wonders why both operators have the same CPU spec. But the > > > > > details can > > > > > >> >> > > > > > > >> >> > be fleshed out in the actual PR. > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo wrote: > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > Thanks all for the discussion. I've updated the FLIP > > > > > accordingly, the > > > > > >> >> > > > > > > >> >> > > key changes are: > > > > > >> >> > > > > > > >> >> > > - Introduce SlotSharingGroup instead of ResourceSpec which > > > > > contains > > > > > >> >> > > > > > > >> >> > > the resource spec of slot sharing group > > > > > >> >> > > > > > > >> >> > > - Introduce two interfaces for specifying the > > > SlotSharingGroup: > > > > > >> >> > > > > > > >> >> > > #slotSharingGroup(SlotSharingGroup) and > > > > > >> >> > > > > > > >> >> > > > > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup). > > > > > >> >> > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > If there is no more feedback, I'd start a vote next week. > > > > > >> >> > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > Best, > > > > > >> >> > > > > > > >> >> > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > Thanks for the valuable suggestion, Arvid. > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > 1) Yes, we can add a new SlotSharingGroup which includes > > > the > > > > > name and > > > > > >> >> > > > > > > >> >> > > > its resource. After that, we have two interfaces for > > > > > configuring the > > > > > >> >> > > > > > > >> >> > > > slot sharing group of an operator: > > > > > >> >> > > > > > > >> >> > > > - #slotSharingGroup(String name) // the resource of it can > > > be > > > > > >> >> > > > > > > >> >> > > > configured through > > > > > StreamExecutionEnvironment#registerSlotSharingGroup > > > > > >> >> > > > > > > >> >> > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly > > > > > configure the > > > > > >> >> > > > > > > >> >> > > resource > > > > > >> >> > > > > > > >> >> > > > And one interface to configure the resource of a SSG: > > > > > >> >> > > > > > > >> >> > > > - > > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > > > > > >> >> > > > > > > >> >> > > > We can also define the priority of the above two > > > approaches, > > > > > e.g. the > > > > > >> >> > > > > > > >> >> > > > resource registering in the StreamExecutionEnvironment > > > > > >> >> > > > will > > > > > always be > > > > > >> >> > > > > > > >> >> > > > respected when conflict. That would be well documented. > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > 2) Yes, I originally add this interface as a shortcut. It > > > seems > > > > > >> >> > > > > > > >> >> > > > unnecessary now. Will remove it. > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > 3) I don't think we need to expose the ExternalResource. > > > In the > > > > > >> >> > > > > > > >> >> > > > builder of SlotSharingGroup, we can introduce a > > > > > >> >> > > > > > > >> >> > > > #withExternalResource(String name, double value). Also, > > > this > > > > > interface > > > > > >> >> > > > > > > >> >> > > > needs to be annotated as evolving. > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would > > > be > > > > > good to > > > > > >> >> > > > > > > >> >> > > > elaborate on the Builder for the SlotSharingGroup. > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > WDYT? > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > Best, > > > > > >> >> > > > > > > >> >> > > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > Hi Yangze, > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > I like the general approach to bind requirements to > > > > > slotsharing > > > > > >> >> > > > > > > >> >> > > groups. I > > > > > >> >> > > > > > > >> >> > > > > think the current approach is also flexible enough that > > > > > >> >> > > > > a > > > > > user could > > > > > >> >> > > > > > > >> >> > > simply > > > > > >> >> > > > > > > >> >> > > > > use ParameterTool or similar to use config values and > > > wire > > > > > that with > > > > > >> >> > > > > > > >> >> > > their > > > > > >> >> > > > > > > >> >> > > > > slotgroups, such that different requirements can be > > > tested > > > > > without > > > > > >> >> > > > > > > >> >> > > > > recompilation. So I don't see an immediate need to > > > provide a > > > > > generic > > > > > >> >> > > > > > > >> >> > > > > solution for yaml configuration for now. > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > Looking at the programmatic interface though, I think we > > > > > could improve > > > > > >> >> > > > > > > >> >> > > by > > > > > >> >> > > > > > > >> >> > > > > quite a bit and I haven't seen these alternatives being > > > > > considered in > > > > > >> >> > > > > > > >> >> > > the > > > > > >> >> > > > > > > >> >> > > > > FLIP: > > > > > >> >> > > > > > > >> >> > > > > 1) Add new class SlotSharingGroup that incorporates all > > > > > ResourceSpec > > > > > >> >> > > > > > > >> >> > > > > properties. Instead of using group names, the user could > > > > > directly > > > > > >> >> > > > > > > >> >> > > configure > > > > > >> >> > > > > > > >> >> > > > > such an object. > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); > > > > > >> >> > > > > // > > > > > name > > > > > >> >> > > > > > > >> >> > > > > could also be omitted and auto-generated > > > > > >> >> > > > > > > >> >> > > > > ssg1.setCPUCores(4); > > > > > >> >> > > > > > > >> >> > > > > ... > > > > > >> >> > > > > > > >> >> > > > > DataStream> grades = > > > > > >> >> > > > > > > >> >> > > > > GradeSource > > > > > >> >> > > > > > > >> >> > > > > .getSource(env, rate) > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > > > > > >> >> > > > > > > >> >> > > > > .slotSharingGroup(ssg1); > > > > > >> >> > > > > > > >> >> > > > > DataStream> salaries = > > > > > >> >> > > > > > > >> >> > > > > SalarySource.getSource(env, rate) > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > > > > > >> >> > > > > > > >> >> > > > > .slotSharingGroup(ssg2); > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > // run the actual window join program with the same slot > > > > > >> >> > > > > > > >> >> > > sharing > > > > > >> >> > > > > > > >> >> > > > > group as grades > > > > > >> >> > > > > > > >> >> > > > > DataStream> joinedStream = > > > > > >> >> > > > > > > >> >> > > > > runWindowJoin(grades, salaries, > > > > > >> >> > > > > > > >> >> > > > > windowSize).slotSharingGroup(ssg1); > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > Note that we could make it backward compatible by > > > changing > > > > > the proposed > > > > > >> >> > > > > > > >> >> > > > > StreamExecutionEnvironment#setSlotSharingGroupResource > > > > > >> >> > > > > to > > > > > >> >> > > > > > > >> >> > > > > > > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > > > > > >> >> > > > > > > >> >> > > and > > > > > >> >> > > > > > > >> >> > > > > then use the string name for further reference. > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > 2) I'm also not sure on the StreamExecutionEnvironment# > > > > > >> >> > > > > > > >> >> > > > > setSlotSharingGroupResources. What's the benefit of the > > > Map > > > > > version > > > > > >> >> > > > > > > >> >> > > over > > > > > >> >> > > > > > > >> >> > > > > having the simple setter? Even if the user has a map > > > > > >> >> > > > > > > >> >> > > > > slotSharingGroupResources, he could simply do > > > > > >> >> > > > > > > >> >> > > > > > > > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource); > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > 3) Is defining the ExternalResource part of this FLIP? I > > > > > don't see a > > > > > >> >> > > > > > > >> >> > > > > Public* class yet. I'd be also fine to cut the scope of > > > this > > > > > FLIP and > > > > > >> >> > > > > > > >> >> > > > > remove it for now and annotate > > > ResourceSpec/SlotSharingGroup > > > > > evolving. > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > 4) We should probably use a builder pattern around > > > > > >> >> > > > > > > >> >> > > > > ResourceSpec/SlotSharingGroup as in the current > > > > > ResourceSpec. I don't > > > > > >> >> > > > > > > >> >> > > think > > > > > >> >> > > > > > > >> >> > > > > we need to fully specify that in the FLIP but it would > > > > > >> >> > > > > be > > > > > good to at > > > > > >> >> > > > > > > >> >> > > least > > > > > >> >> > > > > > > >> >> > > > > say how they should be created by the user. > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > >> >> > > > > > > >> >> > > > > > @Yang > > > > > >> >> > > > > > > >> >> > > > > > In short, the external resources will participate in > > > > > resource > > > > > >> >> > > > > > > >> >> > > > > > deduction and be logically ensured, but requesting an > > > > > external > > > > > >> >> > > > > > > >> >> > > > > > resource must still be done through config options > > > > > >> >> > > > > > with > > > > > the current > > > > > >> >> > > > > > > >> >> > > > > > default resource allocation strategy. > > > > > >> >> > > > > > > >> >> > > > > > In FLIP-56, we abstract the logic of resource > > > allocation > > > > > to the > > > > > >> >> > > > > > > >> >> > > > > > `ResourceAllocationStrategy`. Currently, with its > > > default > > > > > >> >> > > > > > > >> >> > > > > > implementation, ResourceManager would still allocate > > > TMs > > > > > with the > > > > > >> >> > > > > > > >> >> > > same > > > > > >> >> > > > > > > >> >> > > > > > resource spec and the external resources of it are > > > > > configured through > > > > > >> >> > > > > > > >> >> > > > > > the config option as well. So, in your case, you need > > > to > > > > > define the > > > > > >> >> > > > > > > >> >> > > > > > "external-resources" and > > > "external-resources.disk.amount". > > > > > Then, all > > > > > >> >> > > > > > > >> >> > > > > > the disk requirements defined in the SSG will be > > > logically > > > > > ensured, > > > > > >> >> > > > > > > >> >> > > as > > > > > >> >> > > > > > > >> >> > > > > > there is no slot level isolation. If the disk space of > > > a > > > > > task manager > > > > > >> >> > > > > > > >> >> > > > > > cannot fulfill the disk requirement, RM will allocate > > > > > >> >> > > > > > a > > > > > new one. > > > > > >> >> > > > > > > >> >> > > > > > In the future, we'd like to introduce a > > > > > `ResourceAllocationStrategy` > > > > > >> >> > > > > > > >> >> > > > > > which allocates heterogeneous TMs according to the > > > > > requirements. > > > > > >> >> > > > > > > >> >> > > Then, > > > > > >> >> > > > > > > >> >> > > > > > user only needs to define the driver of external > > > resources > > > > > when > > > > > >> >> > > > > > > >> >> > > > > > needed. > > > > > >> >> > > > > > > >> >> > > > > > Also, regarding the resource isolation, we may provide > > > a > > > > > fine-grained > > > > > >> >> > > > > > > >> >> > > > > > mode in which each slot can only fetch the information > > > of > > > > > external > > > > > >> >> > > > > > > >> >> > > > > > resources it requires in the future. But that is out > > > > > >> >> > > > > > of > > > > > the scope of > > > > > >> >> > > > > > > >> >> > > > > > this PR. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > >> >> > > > > > Best, > > > > > >> >> > > > > > > >> >> > > > > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > >> >> > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang > > > > > >> >> > > > > > > >> >> > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > Thanks @Yangze for preparing this FLIP. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > I think this is a good start point for the community > > > > > users to have > > > > > >> >> > > > > > > >> >> > > a > > > > > >> >> > > > > > > >> >> > > > > > taste > > > > > >> >> > > > > > > >> >> > > > > > > on the fine-grained > > > > > >> >> > > > > > > >> >> > > > > > > resource management, which we all believe it could > > > > > improve the > > > > > >> >> > > > > > > >> >> > > Flink job > > > > > >> >> > > > > > > >> >> > > > > > > stability and > > > > > >> >> > > > > > > >> >> > > > > > > cluster utilization. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > I have a simple question about the extended > > > resources. > > > > > It is > > > > > >> >> > > > > > > >> >> > > possible to > > > > > >> >> > > > > > > >> >> > > > > > > combine extended resources > > > > > >> >> > > > > > > >> >> > > > > > > with fine-grained resource management. Except for > > > > > >> >> > > > > > > the > > > > > GPU, FPGA > > > > > >> >> > > > > > > >> >> > > and other > > > > > >> >> > > > > > > >> >> > > > > > > new computing devices, > > > > > >> >> > > > > > > >> >> > > > > > > maybe the disk resource is a more general use case. > > > For > > > > > example, > > > > > >> >> > > > > > > >> >> > > > > > different > > > > > >> >> > > > > > > >> >> > > > > > > SSG may have various > > > > > >> >> > > > > > > >> >> > > > > > > disk requirements based on the state. So we need to > > > > > allocate enough > > > > > >> >> > > > > > > >> >> > > > > > > ephemeral storage resource for every > > > > > >> >> > > > > > > >> >> > > > > > > TaskManager pod in Kubernetes deployment. Otherwise, > > > it > > > > > might be > > > > > >> >> > > > > > > >> >> > > evicted > > > > > >> >> > > > > > > >> >> > > > > > > due to running out of limits. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > Best, > > > > > >> >> > > > > > > >> >> > > > > > > Yang > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > Xintong Song 于2021年6月8日周二 下午1:47写道: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > I think being able to specify fine grained > > > > > >> >> > > > > > > > resource > > > > > requirements > > > > > >> >> > > > > > > >> >> > > > > > without > > > > > >> >> > > > > > > >> >> > > > > > > > having to change the codes and recompile the job > > > > > >> >> > > > > > > > is > > > > > indeed a good > > > > > >> >> > > > > > > >> >> > > > > > idea. It > > > > > >> >> > > > > > > >> >> > > > > > > > definitely improves the usability. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > However, this requires more careful designs, which > > > > > probably > > > > > >> >> > > > > > > >> >> > > deserves a > > > > > >> >> > > > > > > >> >> > > > > > > > separate thread. I'd be good to have that > > > discussion, > > > > > but maybe > > > > > >> >> > > > > > > >> >> > > not > > > > > >> >> > > > > > > >> >> > > > > > block > > > > > >> >> > > > > > > >> >> > > > > > > > this feature on that. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > One idea concerning the configuration approach: As > > > > > Yangze said, > > > > > >> >> > > > > > > >> >> > > flink > > > > > >> >> > > > > > > >> >> > > > > > > > configuration options are supposed to take effect > > > at > > > > > cluster > > > > > >> >> > > > > > > >> >> > > level. For > > > > > >> >> > > > > > > >> >> > > > > > > > updating job level specifics that are not suitable > > > to > > > > > be > > > > > >> >> > > > > > > >> >> > > introduced as > > > > > >> >> > > > > > > >> >> > > > > > a > > > > > >> >> > > > > > > >> >> > > > > > > > config option, currently the only way is to pass > > > them > > > > > as program > > > > > >> >> > > > > > > >> >> > > > > > arguments. > > > > > >> >> > > > > > > >> >> > > > > > > > Would it make sense to introduce a general > > > approach for > > > > > >> >> > > > > > > >> >> > > overwriting > > > > > >> >> > > > > > > >> >> > > > > > such > > > > > >> >> > > > > > > >> >> > > > > > > > job specifics without re-compiling the job? > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > Thank you~ > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > Xintong Song > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo > > > > > >> >> > > > > > > >> >> > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > @Wenlong > > > > > >> >> > > > > > > >> >> > > > > > > > > After another consideration, the config option > > > > > approach I > > > > > >> >> > > > > > > >> >> > > mentioned > > > > > >> >> > > > > > > >> >> > > > > > > > > above might not be appropriate. The resource > > > > > requirements for > > > > > >> >> > > > > > > >> >> > > SSG > > > > > >> >> > > > > > > >> >> > > > > > > > > should be a job level configuration and should > > > no be > > > > > set in the > > > > > >> >> > > > > > > >> >> > > > > > > > > flink-conf. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > I think we can define a JSON format, which would > > > be > > > > > the > > > > > >> >> > > > > > > >> >> > > ResourceSpecs > > > > > >> >> > > > > > > >> >> > > > > > > > > mapped by the name of SSGs, for the resource > > > > > requirements of a > > > > > >> >> > > > > > > >> >> > > > > > > > > specific job. Then, we allow user to configure > > > the > > > > > file path > > > > > >> >> > > > > > > >> >> > > of that > > > > > >> >> > > > > > > >> >> > > > > > > > > JSON. The JSON will be only parsed in runtime, > > > which > > > > > allows > > > > > >> >> > > > > > > >> >> > > user to > > > > > >> >> > > > > > > >> >> > > > > > > > > tune it without re-compiling the job. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > We can add another #setSlotSharingGroupResources > > > for > > > > > >> >> > > > > > > >> >> > > configuring the > > > > > >> >> > > > > > > >> >> > > > > > > > > file path of that JSON: > > > > > >> >> > > > > > > >> >> > > > > > > > > ``` > > > > > >> >> > > > > > > >> >> > > > > > > > > /** > > > > > >> >> > > > > > > >> >> > > > > > > > > * Specify fine-grained resource requirements for > > > > > slot sharing > > > > > >> >> > > > > > > >> >> > > groups > > > > > >> >> > > > > > > >> >> > > > > > > > > with the given resource JSON file. The existing > > > > > resource > > > > > >> >> > > > > > > >> >> > > > > > > > > * requirement of the same slot sharing group > > > will be > > > > > replaced. > > > > > >> >> > > > > > > >> >> > > > > > > > > */ > > > > > >> >> > > > > > > >> >> > > > > > > > > public StreamExecutionEnvironment > > > > > setSlotSharingGroupResources( > > > > > >> >> > > > > > > >> >> > > > > > > > > String pathToResourceJson); > > > > > >> >> > > > > > > >> >> > > > > > > > > ``` > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > WDYT? > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > Best, > > > > > >> >> > > > > > > >> >> > > > > > > > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo > > > > > >> >> > > > > > > > > >> >> > > > > > > >> >> > > > > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong! > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > @Wenlong > > > > > >> >> > > > > > > >> >> > > > > > > > > > I think that is a good idea, adjust the > > > resource > > > > > without > > > > > >> >> > > > > > > >> >> > > > > > re-compiling > > > > > >> >> > > > > > > >> >> > > > > > > > > > the job will facilitate the tuning process. > > > > > >> >> > > > > > > >> >> > > > > > > > > > We can define a pattern > > > > > "slot-sharing-group.resource.{ssg > > > > > >> >> > > > > > > >> >> > > name}" > > > > > >> >> > > > > > > >> >> > > > > > > > > > (welcome any proposal for the prefix naming) > > > for > > > > > the > > > > > >> >> > > > > > > >> >> > > resource spec > > > > > >> >> > > > > > > >> >> > > > > > > > > > config of a slot sharing group. Then, user can > > > set > > > > > the > > > > > >> >> > > > > > > >> >> > > > > > ResourceSpec of > > > > > >> >> > > > > > > >> >> > > > > > > > > > SSG "ssg1" by adding > > > > > "slot-sharing-group.resource.ssg1: > > > > > >> >> > > > > > > >> >> > > {cpu: 1.0, > > > > > >> >> > > > > > > >> >> > > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT? > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > Best, > > > > > >> >> > > > > > > >> >> > > > > > > > > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl < > > > > > >> >> > > > > > > >> >> > > > > > wenlong88....@gmail.com> > > > > > >> >> > > > > > > >> >> > > > > > > > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > Thanks Yangze for the flip, it is great for > > > > > users to be > > > > > >> >> > > > > > > >> >> > > able to > > > > > >> >> > > > > > > >> >> > > > > > > > > declare the > > > > > >> >> > > > > > > >> >> > > > > > > > > > > fine-grained resource requirements for the > > > job. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > I have one minor suggestion: can we support > > > > > setting > > > > > >> >> > > > > > > >> >> > > resource > > > > > >> >> > > > > > > >> >> > > > > > > > > requirements > > > > > >> >> > > > > > > >> >> > > > > > > > > > > by configuration? Currently most of the > > > config > > > > > options in > > > > > >> >> > > > > > > >> >> > > > > > execution > > > > > >> >> > > > > > > >> >> > > > > > > > > config > > > > > >> >> > > > > > > >> >> > > > > > > > > > > can be configured by configuration, and it > > > > > >> >> > > > > > > > > > > is > > > > > very likely > > > > > >> >> > > > > > > >> >> > > that > > > > > >> >> > > > > > > >> >> > > > > > users > > > > > >> >> > > > > > > >> >> > > > > > > > > need > > > > > >> >> > > > > > > >> >> > > > > > > > > > > to adjust the resource according to the > > > > > performance of > > > > > >> >> > > > > > > >> >> > > their job > > > > > >> >> > > > > > > >> >> > > > > > > > during > > > > > >> >> > > > > > > >> >> > > > > > > > > > > debugging, Providing a configuration way > > > > > >> >> > > > > > > > > > > will > > > > > make it more > > > > > >> >> > > > > > > >> >> > > > > > > > convenient. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > Bests, > > > > > >> >> > > > > > > >> >> > > > > > > > > > > Wenlong Lyu > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song < > > > > > >> >> > > > > > > >> >> > > tonysong...@gmail.com > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > Thanks Yangze for preparing the FLIP. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > The proposed changes look good to me. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > As you've mentioned in the implementation > > > > > plan, I > > > > > >> >> > > > > > > >> >> > > believe one > > > > > >> >> > > > > > > >> >> > > > > > of > > > > > >> >> > > > > > > >> >> > > > > > > > the > > > > > >> >> > > > > > > >> >> > > > > > > > > most > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > important tasks of this FLIP is to have > > > > > >> >> > > > > > > > > > > > the > > > > > feature well > > > > > >> >> > > > > > > >> >> > > > > > > > documented. > > > > > >> >> > > > > > > >> >> > > > > > > > > It > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > would be really nice if we can keep that > > > > > >> >> > > > > > > > > > > > in > > > > > mind and > > > > > >> >> > > > > > > >> >> > > start > > > > > >> >> > > > > > > >> >> > > > > > drafting > > > > > >> >> > > > > > > >> >> > > > > > > > > the > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > documentation early. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > Thank you~ > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > Xintong Song > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo > > > > > >> >> > > > > > > > > > > > < > > > > > >> >> > > > > > > >> >> > > karma...@gmail.com> > > > > > >> >> > > > > > > >> >> > > > > > > > > wrote: > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > Hi, there, > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > We would like to start a discussion > > > thread on > > > > > >> >> > > > > > > >> >> > > "FLIP-169: > > > > > >> >> > > > > > > >> >> > > > > > > > DataStream > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > API for Fine-Grained Resource > > > > > Requirements"[1], where > > > > > >> >> > > > > > > >> >> > > we > > > > > >> >> > > > > > > >> >> > > > > > propose > > > > > >> >> > > > > > > >> >> > > > > > > > > the > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > DataStream API for specifying > > > fine-grained > > > > > resource > > > > > >> >> > > > > > > >> >> > > > > > requirements > > > > > >> >> > > > > > > >> >> > > > > > > > in > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > StreamExecutionEnvironment. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > Please find more details in the FLIP > > > > > >> >> > > > > > > > > > > > > wiki > > > > > document [1]. > > > > > >> >> > > > > > > >> >> > > > > > Looking > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > forward to your feedback. > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > [1] > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > Best, > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > Yangze Guo > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > >> >> > > > > > > > >> >> > > > > > > > > >