Re:[DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, dev. After reviewing the entire email discussion thread with Rui, I noticed that my previous ambiguous understanding led to a few incorrect conclusions. So I need to change the corresponding conclusions. And Thanks for the help from Rui. >For David: >The problem you're trying to solve only exists in complex graphs with >different per-vertex parallelism. If the parallelism is set globally >(assuming the pipeline has roughly even data skew), the algorithm could >make things slightly worse by eliminating some local exchanges. Is that >correct? I re-checked that if all parallelisms of all nodes are equal, the new strategy will not disrupt local exchanges, all subtasks with forward shuffle are still in the same Slot. As described in the 2.1.1 core logic of FLIP-370[1], If all parallelisms of all nodes are equal, The new strategy would traverse all SEVs of JV, assign the SEVs[subtask_index] to the ESSGs[subtask_index]. As the result of the new strategy: a. This strategy ensures that SEVs with the same index can be assigned to the same ESSG. b. In the case of forward edges, all subtasks with forward shuffle are still in the same Slot, and they are local data exchanges. -- If there are no additional comments about the FLIP, I’d plan to initiate a vote about the FLIP next Monday. Best Regards, Yuepeng [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling At 2023-09-25 16:25:03, "Yuepeng Pan" wrote: >Hi all, > > > > >I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks >scheduling. > > > > >The current strategy of Flink to deploy tasks sometimes leads some >TMs(TaskManagers) to have more tasks while others have fewer tasks, resulting >in excessive resource utilization at some TMs that contain more tasks and >becoming a bottleneck for the entire job processing. Developing strategies to >achieve task load balancing for TMs and reducing job bottlenecks becomes very >meaningful. > > > > >The raw design and discussions could be found in the Flink JIRA[2] and Google >doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some valuable help >and suggestions in advance. > > > > >Please refer to the FLIP[1] document for more details about the proposed >design and implementation. We welcome any feedback and opinions on this >proposal. > > > > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling > >[2] https://issues.apache.org/jira/browse/FLINK-31757 > >[3] >https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8 > > > > >Best, > >Yuepeng Pan
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, Rui. Thank you for the update. +1 for the updated edition of the FLIP page. And thanks Zhu Zhu, Yangze Guo for the discussion. Best Regards. Yuepeng Pan On 2023/10/17 03:45:08 Rui Fan wrote: > Hi all, > > Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan. > We reached consensus on slot.request.max-interval and > taskmanager.load-balance.mode. And I have updated the FLIP. > > For a detailed introduction to taskmanager.load-balance.mode, > please refer to FLIP’s 3.1 Public Interfaces[1]. > > And the strategy for slot.request.max-intervel has been improved. > The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2]. > For comparison of old and new strategies, please refer to > RejectedAlternatives[3]. > > Thanks again to everyone who participated in the discussion. > Looking forward to your continued feedback. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism > [3] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives > > Best, > Rui > > On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan wrote: > > > Hi, Shammon. > > Thanks for your feedback. > > > > >1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > > and `DeclarativeSlotPool`? > > > > As described on the FLIP page, the current design plans to introduce the > > waiting mechanism only in the `SlotPool`, because the existing > > `WaitingForResources` can already achieve this effect. > > > > >Currently the two slot pools are used in different schedulers. > > > > Yes, that's indeed the case. > > > > >I think this will also bring value to `DeclarativeSlotPool`, but > > currently FLIP content seems to be based on `SlotPool`, right? > > > > Yes. your expectations are indeed reasonable. In theory, the > > `DeclarativeSlotPool` could also benefit from a waiting mechanism, as > > discussed. The purpose of introducing the waiting mechanism is to enable > > the `SlotPool` to have a global view to calculate the globally optimal > > solution. I've rechecked the relevant logic in the `AdaptiveScheduler`, and > > as I understand, the existing mechanisms already fulfill the current > > feature requirements. You could find more conclusions on this in FLIP > > `3.2.5`. Of course, I'd be appreciated with your confirmation. If there's > > any misunderstanding on my part, please correct me. > > > > >2. ... What should be done when the slot selected by the round-robin > > strategy cannot meet the resource requirements? > > > > Is this referring to the phase of task-to-slot allocation? I'm not quite > > sure, would you mind explaining it? Thanks~. > > > > >3. Is the assignment of tasks to slots balanced based on region or job > > level? > > > > Currently, there is no specific handling based on regions, and there is no > > job-level balancing. The target effect of the current feature is to achieve > > load balancing based on the number of tasks at the Task Manager (TM) level. > > Looking forward to any suggestions regarding the item you mentioned. > > > > >When multiple TMs fail over, will it cause the balancing strategy to fail > > or even worse? > > > > IIUC, when multiple Task Managers undergo failover, the results after > > successful recovery will still be maintained in a relatively balanced state. > > > > >What is the current processing strategy? > > > > The Slot-to-TM strategy does not change after a Task Manager undergoes > > failover. > > > > Best, Regards. > > Yuepeng Pan > > > > On 2023/09/28 05:10:13 Shammon FY wrote: > > > Thanks Yuepeng for initiating this discussion. > > > > > > +1 in general too, in fact we have implemented a similar mechanism > > > internally to ensure a balanced allocation of tasks to slots, it works > > well. > > > > > > Some comments about the mechanism > > > > > > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > > > and `DeclarativeSlotPool`? Currently the two slot pools are used in > > > different schedulers. I think this will also bring value to > > > `DeclarativeSlotPool`, but currently FLIP content seems to be based on > > > `SlotPool`, right? > > > > > > 2. In fine-grained resource management, we can set different resource > > > requirements for different nodes, which means that the resources of each > > > slot are different. What should be done when the slot selected by the > > > round-robin strategy cannot meet the resource requirements? Will this > > lead > > > to the failure of the balance strategy? > > > > > > 3. Is the assignment of tasks to slots balanced based on region or job > > > level? When multiple TMs fail over, will it cause the balancing strategy > > to > > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks for the update, Rui. +1 for the latest version of the FLIP. Best, Yangze Guo On Tue, Oct 17, 2023 at 11:45 AM Rui Fan <1996fan...@gmail.com> wrote: > > Hi all, > > Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan. > We reached consensus on slot.request.max-interval and > taskmanager.load-balance.mode. And I have updated the FLIP. > > For a detailed introduction to taskmanager.load-balance.mode, > please refer to FLIP’s 3.1 Public Interfaces[1]. > > And the strategy for slot.request.max-intervel has been improved. > The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2]. > For comparison of old and new strategies, please refer to > RejectedAlternatives[3]. > > Thanks again to everyone who participated in the discussion. > Looking forward to your continued feedback. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism > [3] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives > > Best, > Rui > > On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan wrote: >> >> Hi, Shammon. >> Thanks for your feedback. >> >> >1. This mechanism will be only supported in `SlotPool` or both `SlotPool` >> >and `DeclarativeSlotPool`? >> >> As described on the FLIP page, the current design plans to introduce the >> waiting mechanism only in the `SlotPool`, because the existing >> `WaitingForResources` can already achieve this effect. >> >> >Currently the two slot pools are used in different schedulers. >> >> Yes, that's indeed the case. >> >> >I think this will also bring value to `DeclarativeSlotPool`, but currently >> >FLIP content seems to be based on `SlotPool`, right? >> >> Yes. your expectations are indeed reasonable. In theory, the >> `DeclarativeSlotPool` could also benefit from a waiting mechanism, as >> discussed. The purpose of introducing the waiting mechanism is to enable the >> `SlotPool` to have a global view to calculate the globally optimal solution. >> I've rechecked the relevant logic in the `AdaptiveScheduler`, and as I >> understand, the existing mechanisms already fulfill the current feature >> requirements. You could find more conclusions on this in FLIP `3.2.5`. Of >> course, I'd be appreciated with your confirmation. If there's any >> misunderstanding on my part, please correct me. >> >> >2. ... What should be done when the slot selected by the round-robin >> >strategy cannot meet the resource requirements? >> >> Is this referring to the phase of task-to-slot allocation? I'm not quite >> sure, would you mind explaining it? Thanks~. >> >> >3. Is the assignment of tasks to slots balanced based on region or job >> >level? >> >> Currently, there is no specific handling based on regions, and there is no >> job-level balancing. The target effect of the current feature is to achieve >> load balancing based on the number of tasks at the Task Manager (TM) level. >> Looking forward to any suggestions regarding the item you mentioned. >> >> >When multiple TMs fail over, will it cause the balancing strategy to fail >> >or even worse? >> >> IIUC, when multiple Task Managers undergo failover, the results after >> successful recovery will still be maintained in a relatively balanced state. >> >> >What is the current processing strategy? >> >> The Slot-to-TM strategy does not change after a Task Manager undergoes >> failover. >> >> Best, Regards. >> Yuepeng Pan >> >> On 2023/09/28 05:10:13 Shammon FY wrote: >> > Thanks Yuepeng for initiating this discussion. >> > >> > +1 in general too, in fact we have implemented a similar mechanism >> > internally to ensure a balanced allocation of tasks to slots, it works >> > well. >> > >> > Some comments about the mechanism >> > >> > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` >> > and `DeclarativeSlotPool`? Currently the two slot pools are used in >> > different schedulers. I think this will also bring value to >> > `DeclarativeSlotPool`, but currently FLIP content seems to be based on >> > `SlotPool`, right? >> > >> > 2. In fine-grained resource management, we can set different resource >> > requirements for different nodes, which means that the resources of each >> > slot are different. What should be done when the slot selected by the >> > round-robin strategy cannot meet the resource requirements? Will this lead >> > to the failure of the balance strategy? >> > >> > 3. Is the assignment of tasks to slots balanced based on region or job >> > level? When multiple TMs fail over, will it cause the balancing strategy to >> > fail or even worse? What is the current processing strategy? >> > >> > For Zhuzhu and Rui: >> > >> >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi all, Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan. We reached consensus on slot.request.max-interval and taskmanager.load-balance.mode. And I have updated the FLIP. For a detailed introduction to taskmanager.load-balance.mode, please refer to FLIP’s 3.1 Public Interfaces[1]. And the strategy for slot.request.max-intervel has been improved. The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2]. For comparison of old and new strategies, please refer to RejectedAlternatives[3]. Thanks again to everyone who participated in the discussion. Looking forward to your continued feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives Best, Rui On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan wrote: > Hi, Shammon. > Thanks for your feedback. > > >1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > and `DeclarativeSlotPool`? > > As described on the FLIP page, the current design plans to introduce the > waiting mechanism only in the `SlotPool`, because the existing > `WaitingForResources` can already achieve this effect. > > >Currently the two slot pools are used in different schedulers. > > Yes, that's indeed the case. > > >I think this will also bring value to `DeclarativeSlotPool`, but > currently FLIP content seems to be based on `SlotPool`, right? > > Yes. your expectations are indeed reasonable. In theory, the > `DeclarativeSlotPool` could also benefit from a waiting mechanism, as > discussed. The purpose of introducing the waiting mechanism is to enable > the `SlotPool` to have a global view to calculate the globally optimal > solution. I've rechecked the relevant logic in the `AdaptiveScheduler`, and > as I understand, the existing mechanisms already fulfill the current > feature requirements. You could find more conclusions on this in FLIP > `3.2.5`. Of course, I'd be appreciated with your confirmation. If there's > any misunderstanding on my part, please correct me. > > >2. ... What should be done when the slot selected by the round-robin > strategy cannot meet the resource requirements? > > Is this referring to the phase of task-to-slot allocation? I'm not quite > sure, would you mind explaining it? Thanks~. > > >3. Is the assignment of tasks to slots balanced based on region or job > level? > > Currently, there is no specific handling based on regions, and there is no > job-level balancing. The target effect of the current feature is to achieve > load balancing based on the number of tasks at the Task Manager (TM) level. > Looking forward to any suggestions regarding the item you mentioned. > > >When multiple TMs fail over, will it cause the balancing strategy to fail > or even worse? > > IIUC, when multiple Task Managers undergo failover, the results after > successful recovery will still be maintained in a relatively balanced state. > > >What is the current processing strategy? > > The Slot-to-TM strategy does not change after a Task Manager undergoes > failover. > > Best, Regards. > Yuepeng Pan > > On 2023/09/28 05:10:13 Shammon FY wrote: > > Thanks Yuepeng for initiating this discussion. > > > > +1 in general too, in fact we have implemented a similar mechanism > > internally to ensure a balanced allocation of tasks to slots, it works > well. > > > > Some comments about the mechanism > > > > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > > and `DeclarativeSlotPool`? Currently the two slot pools are used in > > different schedulers. I think this will also bring value to > > `DeclarativeSlotPool`, but currently FLIP content seems to be based on > > `SlotPool`, right? > > > > 2. In fine-grained resource management, we can set different resource > > requirements for different nodes, which means that the resources of each > > slot are different. What should be done when the slot selected by the > > round-robin strategy cannot meet the resource requirements? Will this > lead > > to the failure of the balance strategy? > > > > 3. Is the assignment of tasks to slots balanced based on region or job > > level? When multiple TMs fail over, will it cause the balancing strategy > to > > fail or even worse? What is the current processing strategy? > > > > For Zhuzhu and Rui: > > > > IIUC, the overall balance is divided into two parts: slot to TM and task > to > > slot. > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > > 2. Task to slot is guaranteed by the slot pool in JM > > > > These two are completely independent, what are the benefits of unifying > > these two into one option?
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, Shammon. Thanks for your feedback. >1. This mechanism will be only supported in `SlotPool` or both `SlotPool` and >`DeclarativeSlotPool`? As described on the FLIP page, the current design plans to introduce the waiting mechanism only in the `SlotPool`, because the existing `WaitingForResources` can already achieve this effect. >Currently the two slot pools are used in different schedulers. Yes, that's indeed the case. >I think this will also bring value to `DeclarativeSlotPool`, but currently >FLIP content seems to be based on `SlotPool`, right? Yes. your expectations are indeed reasonable. In theory, the `DeclarativeSlotPool` could also benefit from a waiting mechanism, as discussed. The purpose of introducing the waiting mechanism is to enable the `SlotPool` to have a global view to calculate the globally optimal solution. I've rechecked the relevant logic in the `AdaptiveScheduler`, and as I understand, the existing mechanisms already fulfill the current feature requirements. You could find more conclusions on this in FLIP `3.2.5`. Of course, I'd be appreciated with your confirmation. If there's any misunderstanding on my part, please correct me. >2. ... What should be done when the slot selected by the round-robin strategy >cannot meet the resource requirements? Is this referring to the phase of task-to-slot allocation? I'm not quite sure, would you mind explaining it? Thanks~. >3. Is the assignment of tasks to slots balanced based on region or job level? Currently, there is no specific handling based on regions, and there is no job-level balancing. The target effect of the current feature is to achieve load balancing based on the number of tasks at the Task Manager (TM) level. Looking forward to any suggestions regarding the item you mentioned. >When multiple TMs fail over, will it cause the balancing strategy to fail or >even worse? IIUC, when multiple Task Managers undergo failover, the results after successful recovery will still be maintained in a relatively balanced state. >What is the current processing strategy? The Slot-to-TM strategy does not change after a Task Manager undergoes failover. Best, Regards. Yuepeng Pan On 2023/09/28 05:10:13 Shammon FY wrote: > Thanks Yuepeng for initiating this discussion. > > +1 in general too, in fact we have implemented a similar mechanism > internally to ensure a balanced allocation of tasks to slots, it works well. > > Some comments about the mechanism > > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > and `DeclarativeSlotPool`? Currently the two slot pools are used in > different schedulers. I think this will also bring value to > `DeclarativeSlotPool`, but currently FLIP content seems to be based on > `SlotPool`, right? > > 2. In fine-grained resource management, we can set different resource > requirements for different nodes, which means that the resources of each > slot are different. What should be done when the slot selected by the > round-robin strategy cannot meet the resource requirements? Will this lead > to the failure of the balance strategy? > > 3. Is the assignment of tasks to slots balanced based on region or job > level? When multiple TMs fail over, will it cause the balancing strategy to > fail or even worse? What is the current processing strategy? > > For Zhuzhu and Rui: > > IIUC, the overall balance is divided into two parts: slot to TM and task to > slot. > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > 2. Task to slot is guaranteed by the slot pool in JM > > These two are completely independent, what are the benefits of unifying > these two into one option? Also, do we want to share the same > option between SlotPool in JM and SlotManager in RM? This sounds a bit > strange. > > Best, > Shammon FY > > > > On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote: > > > Hi Zhu Zhu, > > > > Thanks for your feedback here! > > > > You are right, user needs to set 2 options: > > - cluster.evenly-spread-out-slots=true > > - slot.sharing-strategy=TASK_BALANCED_PREFERRED > > > > Update it to one option is useful at user side, so > > `taskmanager.load-balance.mode` sounds good to me. > > I want to check some points and behaviors about this option: > > > > 1. The default value is None, right? > > 2. When it's set to Tasks, how to assign slots to TM? > > - Option1: It's just check task number > > - Option2: It''s check the slot number first, then check the > > task number when the slot number is the same. > > > > Giving an example to explain what's the difference between them: > > > > - A session cluster has 2 flink jobs, they are jobA and jobB > > - Each TM has 4 slots. > > - The task number of one slot of jobA is 3 > > - The task number of one slot of jobB is 1 > > - We have 2 TaskManagers: > > - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks > > - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks. > > > > Now, we
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Yuepeng, Thanks for your feedback. I agree with u, both approaches can achieve the goal. As long as we can easily extend the balancing strategy to consider more than one factors without changing the interface, the solution is OK for me. Regards, Xiangyu Yuepeng Pan 于2023年10月11日周三 17:38写道: > Hi, xiangyu. > Thanks for your quick reply. > > >interface currently only includes a description of the number of tasks. > So, > >IIUC, If there is a need to further expand > >current interface and its implementations, right? > > Yes, that's indeed the case. > > >I checked the interface design of LoadingWeight and WeightLoadable, AFAIK > >currently it only supports comparing the load for one factor. If we want > to > >add more loading factors, LoadingWeight might need to add a 'LoadType' > >field for distinction, WeightLoadable might need to return > >Set. > > Thank you for the clarification, I think I roughly understand your > description: > In fact, regarding the specific implementation and extension of this > LoadingWeight, we can extend it based on this interface and its > implementation as mentioned above. > If making frequent changes to the interface and its implementation is > really tiresome, we can also consider introducing a built-in collapsible > Map or other type of attribute, like the SlotSharingGroup class in the > org.apache.flink.api.common.operators package, to describe the specific > collection of load values and types. This way, these loads are collapsed > within the LoadingWeight's implementation and can be expanded when needed > for use. Of course, we can also consider an implementation like the one you > mentioned, introducing a method in WeightLoadable that returns a collection > as the return type, so the load values are expanded at the calling site and > then used. As I understand it, both approaches can achieve the goal. > > Of course, I also look forward to hearing others' suggestions. If there > are any mistakes in my statement, please correct me. > Looking forward to your reply. > > Best regards. > Yuepeng Pan > > On 2023/10/11 08:44:51 xiangyu feng wrote: > > Hi Yuepeng, > > > > Thx for ur reply. > > > > > Nice feedback. In fact, as mentioned in the Google Doc, the > LoadingWeight > > interface currently only includes a description of the number of tasks. > So, > > IIUC, If there is a need to further expand > > > descriptions of other resource loads, we just extend it based on the > > current interface and its implementations, right? > > > > I checked the interface design of LoadingWeight and WeightLoadable, AFAIK > > currently it only supports comparing the load for one factor. If we want > to > > add more loading factors, LoadingWeight might need to add a 'LoadType' > > field for distinction, WeightLoadable might need to return > > Set. > > > > I'm not sure I understand this correctly, WDYT? > > > > Regards, > > Xiangyu > > > > Yuepeng Pan 于2023年10月11日周三 13:53写道: > > > > > Hi, xiangyu, > > > Thanks for your attention as well. > > > > > > >1, About the waiting mechanism: Will the waiting mechanism happen > only in > > > >the second level 'assigning slots to TM'? IIUC, the first level > 'assigning > > > >Tasks to Slots' needs only the asynchronous slot result from slotpool. > > > > > > As described in the latest FLIP, the introduction of the waiting > mechanism > > > at the second level is to ensure that, in all deployment modes such as > > > application, session, etc., we do not fall into a local greedy state > when > > > selecting the optimal slot position. This requires obtaining a global > > > resource view to get the best result. > > > IIUC, The allocation process from Task to Slot is the generation of a > > > mapping relationship between two abstract descriptions, and at this > point, > > > there is no coupling of information between tasks/slots and Task > Managers > > > (TMs). > > > > > > > > > >2, About the slot LoadingWeight: it is reasonable to use the number of > > > >tasks by default in the beginning, but it would be better if this > could be > > > >easily extended in future to distinguish between CPU-intensive and > > > >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > > > >others have CPU bottlenecks. > > > > > > Nice feedback. In fact, as mentioned in the Google Doc, the > LoadingWeight > > > interface currently only includes a description of the number of > tasks. So, > > > IIUC, If there is a need to further expand descriptions of other > resource > > > loads, we just extend it based on the current interface and its > > > implementations, right? > > > Please correct me if I have misunderstood. Thanks a lot~ > > > > > > Best, > > > Yuepeng. > > > > > > On 2023/10/06 10:19:21 xiangyu feng wrote: > > > > Thanks Yuepeng and Rui for driving this Discussion. > > > > > > > > Internally when we try to use Flink 1.17.1 in production, we are also > > > > suffering from the unbalanced task distribution problem for jobs with > > > high > > > > qps and
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, xiangyu. Thanks for your quick reply. >interface currently only includes a description of the number of tasks. So, >IIUC, If there is a need to further expand >current interface and its implementations, right? Yes, that's indeed the case. >I checked the interface design of LoadingWeight and WeightLoadable, AFAIK >currently it only supports comparing the load for one factor. If we want to >add more loading factors, LoadingWeight might need to add a 'LoadType' >field for distinction, WeightLoadable might need to return >Set. Thank you for the clarification, I think I roughly understand your description: In fact, regarding the specific implementation and extension of this LoadingWeight, we can extend it based on this interface and its implementation as mentioned above. If making frequent changes to the interface and its implementation is really tiresome, we can also consider introducing a built-in collapsible Map or other type of attribute, like the SlotSharingGroup class in the org.apache.flink.api.common.operators package, to describe the specific collection of load values and types. This way, these loads are collapsed within the LoadingWeight's implementation and can be expanded when needed for use. Of course, we can also consider an implementation like the one you mentioned, introducing a method in WeightLoadable that returns a collection as the return type, so the load values are expanded at the calling site and then used. As I understand it, both approaches can achieve the goal. Of course, I also look forward to hearing others' suggestions. If there are any mistakes in my statement, please correct me. Looking forward to your reply. Best regards. Yuepeng Pan On 2023/10/11 08:44:51 xiangyu feng wrote: > Hi Yuepeng, > > Thx for ur reply. > > > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight > interface currently only includes a description of the number of tasks. So, > IIUC, If there is a need to further expand > > descriptions of other resource loads, we just extend it based on the > current interface and its implementations, right? > > I checked the interface design of LoadingWeight and WeightLoadable, AFAIK > currently it only supports comparing the load for one factor. If we want to > add more loading factors, LoadingWeight might need to add a 'LoadType' > field for distinction, WeightLoadable might need to return > Set. > > I'm not sure I understand this correctly, WDYT? > > Regards, > Xiangyu > > Yuepeng Pan 于2023年10月11日周三 13:53写道: > > > Hi, xiangyu, > > Thanks for your attention as well. > > > > >1, About the waiting mechanism: Will the waiting mechanism happen only in > > >the second level 'assigning slots to TM'? IIUC, the first level 'assigning > > >Tasks to Slots' needs only the asynchronous slot result from slotpool. > > > > As described in the latest FLIP, the introduction of the waiting mechanism > > at the second level is to ensure that, in all deployment modes such as > > application, session, etc., we do not fall into a local greedy state when > > selecting the optimal slot position. This requires obtaining a global > > resource view to get the best result. > > IIUC, The allocation process from Task to Slot is the generation of a > > mapping relationship between two abstract descriptions, and at this point, > > there is no coupling of information between tasks/slots and Task Managers > > (TMs). > > > > > > >2, About the slot LoadingWeight: it is reasonable to use the number of > > >tasks by default in the beginning, but it would be better if this could be > > >easily extended in future to distinguish between CPU-intensive and > > >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > > >others have CPU bottlenecks. > > > > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight > > interface currently only includes a description of the number of tasks. So, > > IIUC, If there is a need to further expand descriptions of other resource > > loads, we just extend it based on the current interface and its > > implementations, right? > > Please correct me if I have misunderstood. Thanks a lot~ > > > > Best, > > Yuepeng. > > > > On 2023/10/06 10:19:21 xiangyu feng wrote: > > > Thanks Yuepeng and Rui for driving this Discussion. > > > > > > Internally when we try to use Flink 1.17.1 in production, we are also > > > suffering from the unbalanced task distribution problem for jobs with > > high > > > qps and complex dag. So +1 for the overall proposal. > > > > > > Some questions about the details: > > > > > > 1, About the waiting mechanism: Will the waiting mechanism happen only in > > > the second level 'assigning slots to TM'? IIUC, the first level > > 'assigning > > > Tasks to Slots' needs only the asynchronous slot result from slotpool. > > > > > > 2, About the slot LoadingWeight: it is reasonable to use the number of > > > tasks by default in the beginning, but it would be better if this could > > be
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Yuepeng, Thx for ur reply. > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight interface currently only includes a description of the number of tasks. So, IIUC, If there is a need to further expand > descriptions of other resource loads, we just extend it based on the current interface and its implementations, right? I checked the interface design of LoadingWeight and WeightLoadable, AFAIK currently it only supports comparing the load for one factor. If we want to add more loading factors, LoadingWeight might need to add a 'LoadType' field for distinction, WeightLoadable might need to return Set. I'm not sure I understand this correctly, WDYT? Regards, Xiangyu Yuepeng Pan 于2023年10月11日周三 13:53写道: > Hi, xiangyu, > Thanks for your attention as well. > > >1, About the waiting mechanism: Will the waiting mechanism happen only in > >the second level 'assigning slots to TM'? IIUC, the first level 'assigning > >Tasks to Slots' needs only the asynchronous slot result from slotpool. > > As described in the latest FLIP, the introduction of the waiting mechanism > at the second level is to ensure that, in all deployment modes such as > application, session, etc., we do not fall into a local greedy state when > selecting the optimal slot position. This requires obtaining a global > resource view to get the best result. > IIUC, The allocation process from Task to Slot is the generation of a > mapping relationship between two abstract descriptions, and at this point, > there is no coupling of information between tasks/slots and Task Managers > (TMs). > > > >2, About the slot LoadingWeight: it is reasonable to use the number of > >tasks by default in the beginning, but it would be better if this could be > >easily extended in future to distinguish between CPU-intensive and > >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > >others have CPU bottlenecks. > > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight > interface currently only includes a description of the number of tasks. So, > IIUC, If there is a need to further expand descriptions of other resource > loads, we just extend it based on the current interface and its > implementations, right? > Please correct me if I have misunderstood. Thanks a lot~ > > Best, > Yuepeng. > > On 2023/10/06 10:19:21 xiangyu feng wrote: > > Thanks Yuepeng and Rui for driving this Discussion. > > > > Internally when we try to use Flink 1.17.1 in production, we are also > > suffering from the unbalanced task distribution problem for jobs with > high > > qps and complex dag. So +1 for the overall proposal. > > > > Some questions about the details: > > > > 1, About the waiting mechanism: Will the waiting mechanism happen only in > > the second level 'assigning slots to TM'? IIUC, the first level > 'assigning > > Tasks to Slots' needs only the asynchronous slot result from slotpool. > > > > 2, About the slot LoadingWeight: it is reasonable to use the number of > > tasks by default in the beginning, but it would be better if this could > be > > easily extended in future to distinguish between CPU-intensive and > > IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > > others have CPU bottlenecks. > > > > Regards, > > Xiangyu > > > > > > Yuepeng Pan 于2023年10月5日周四 18:34写道: > > > > > Hi, Zhu Zhu, > > > > > > Thanks for your feedback! > > > > > > > I think we can introduce a new config option > > > > `taskmanager.load-balance.mode`, > > > > which accepts "None"/"Slots"/"Tasks". > `cluster.evenly-spread-out-slots` > > > > can be superseded by the "Slots" mode and get deprecated. In the > future > > > > it can support more mode, e.g. "CpuCores", to work better for jobs > with > > > > fine-grained resources. The proposed config option > > > > `slot.request.max-interval` > > > > then can be renamed to > > > `taskmanager.load-balance.request-stablizing-timeout` > > > > to show its relation with the feature. The proposed > > > `slot.sharing-strategy` > > > > is not needed, because the configured "Tasks" mode will do the work. > > > > > > The new proposed configuration option sounds good to me. > > > > > > I have a small question, If we set our configuration value to 'Tasks,' > it > > > will initiate two processes: balancing the allocation of task > quantities at > > > the slot level and balancing the number of tasks across TaskManagers > (TMs). > > > Alternatively, if we configure it as 'Slots,' the system will employ > the > > > LocalPreferred allocation policy (which is the default) when assigning > > > tasks to slots, and it will ensure that the number of slots used > across TMs > > > is balanced. > > > Does this configuration essentially combine a balanced selection > strategy > > > across two dimensions into fixed configuration items, right? > > > > > > I would appreciate it if you could correct me if I've made any errors. > > > > > > Best, > > > Yuepeng. > > > > > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, xiangyu, Thanks for your attention as well. >1, About the waiting mechanism: Will the waiting mechanism happen only in >the second level 'assigning slots to TM'? IIUC, the first level 'assigning >Tasks to Slots' needs only the asynchronous slot result from slotpool. As described in the latest FLIP, the introduction of the waiting mechanism at the second level is to ensure that, in all deployment modes such as application, session, etc., we do not fall into a local greedy state when selecting the optimal slot position. This requires obtaining a global resource view to get the best result. IIUC, The allocation process from Task to Slot is the generation of a mapping relationship between two abstract descriptions, and at this point, there is no coupling of information between tasks/slots and Task Managers (TMs). >2, About the slot LoadingWeight: it is reasonable to use the number of >tasks by default in the beginning, but it would be better if this could be >easily extended in future to distinguish between CPU-intensive and >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but >others have CPU bottlenecks. Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight interface currently only includes a description of the number of tasks. So, IIUC, If there is a need to further expand descriptions of other resource loads, we just extend it based on the current interface and its implementations, right? Please correct me if I have misunderstood. Thanks a lot~ Best, Yuepeng. On 2023/10/06 10:19:21 xiangyu feng wrote: > Thanks Yuepeng and Rui for driving this Discussion. > > Internally when we try to use Flink 1.17.1 in production, we are also > suffering from the unbalanced task distribution problem for jobs with high > qps and complex dag. So +1 for the overall proposal. > > Some questions about the details: > > 1, About the waiting mechanism: Will the waiting mechanism happen only in > the second level 'assigning slots to TM'? IIUC, the first level 'assigning > Tasks to Slots' needs only the asynchronous slot result from slotpool. > > 2, About the slot LoadingWeight: it is reasonable to use the number of > tasks by default in the beginning, but it would be better if this could be > easily extended in future to distinguish between CPU-intensive and > IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > others have CPU bottlenecks. > > Regards, > Xiangyu > > > Yuepeng Pan 于2023年10月5日周四 18:34写道: > > > Hi, Zhu Zhu, > > > > Thanks for your feedback! > > > > > I think we can introduce a new config option > > > `taskmanager.load-balance.mode`, > > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > > > can be superseded by the "Slots" mode and get deprecated. In the future > > > it can support more mode, e.g. "CpuCores", to work better for jobs with > > > fine-grained resources. The proposed config option > > > `slot.request.max-interval` > > > then can be renamed to > > `taskmanager.load-balance.request-stablizing-timeout` > > > to show its relation with the feature. The proposed > > `slot.sharing-strategy` > > > is not needed, because the configured "Tasks" mode will do the work. > > > > The new proposed configuration option sounds good to me. > > > > I have a small question, If we set our configuration value to 'Tasks,' it > > will initiate two processes: balancing the allocation of task quantities at > > the slot level and balancing the number of tasks across TaskManagers (TMs). > > Alternatively, if we configure it as 'Slots,' the system will employ the > > LocalPreferred allocation policy (which is the default) when assigning > > tasks to slots, and it will ensure that the number of slots used across TMs > > is balanced. > > Does this configuration essentially combine a balanced selection strategy > > across two dimensions into fixed configuration items, right? > > > > I would appreciate it if you could correct me if I've made any errors. > > > > Best, > > Yuepeng. > > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, David, Thank you very much for your attention. >The problem you're trying to solve only exists in complex graphs with >different per-vertex parallelism. If the parallelism is set globally >(assuming the pipeline has roughly even data skew), the algorithm could >make things slightly worse by eliminating some local exchanges. Is that >correct? Your understanding is accurate, and it's undeniable that such use case scenarios exist. >Where I'm headed with this is that there could be a hybrid strategy that >provides a reasonable default when the pipeline uses slot-sharing (for >per-vertex parallelism, use the new strategy; for global parallelism use >the old one). It's always a shame if improvements like this end up being a >power-user feature and very few workloads benefit from it. Any thoughts? The concept of letting the engine determine the scheduling strategy based on a predefined rule is excellent. This approach aims to maximize job performance while minimizing user intervention. It might not need to rush into implementing this rule at this moment. What I mean is, we can evaluate and develop a well-founded rule in future work. Nonetheless, we can still consider this rule in advance so that it can be validated after the feature's release. Additionally, if we decide to implement this rule in the future, it should be introduced as a switch. As you pointed out, we currently don't take data characteristics' impact on task resource allocation in the actual environment into account. Therefore, implementing it as a switch will offer users greater flexibility. Of course, it will add a little complexity to users' understanding of this parameter. I'm also eager to hear from other contributors regarding it and looking forward to your reply. Best, Yuepeng. On 2023/10/02 20:37:12 David Morávek wrote: > Hello Yuepeng, > > The FLIP reads sane; nice work! To re-phrase my understanding: > > The problem you're trying to solve only exists in complex graphs with > different per-vertex parallelism. If the parallelism is set globally > (assuming the pipeline has roughly even data skew), the algorithm could > make things slightly worse by eliminating some local exchanges. Is that > correct? > > Where I'm headed with this is that there could be a hybrid strategy that > provides a reasonable default when the pipeline uses slot-sharing (for > per-vertex parallelism, use the new strategy; for global parallelism use > the old one). It's always a shame if improvements like this end up being a > power-user feature and very few workloads benefit from it. Any thoughts? > > Best, > D. > > On Sun, Oct 1, 2023 at 1:38 PM Yangze Guo wrote: > > > Hi, Rui, > > > > 1. With the current mechanism, when physical slots are offered from > > TM, the JobMaster will start deploying tasks and synchronizing their > > states. With the addition of the waiting mechanism, IIUC, the > > JobMaster will deploy and synchronize the states of all tasks only > > after all resources are available. The task deployment and state > > synchronization both occupy the JobMaster's RPC main thread. In > > complex jobs with a lot of tasks, this waiting mechanism may increase > > the pressure on the JobMaster and increase the end-to-end job > > deployment time. > > > > 2. From my understanding, if user enable the > > cluster.evenly-spread-out-slots, > > LeastUtilizationResourceMatchingStrategy will be used to determine the > > slot distribution and the slot allocation in the three TM will be > > (taskmanager.numberOfTaskSlots=3): > > TM1: 3 slot > > TM2: 2 slot > > TM3: 2 slot > > > > Best, > > Yangze Guo > > > > On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > > > Hi Shammon, > > > > > > Thanks for your feedback as well! > > > > > > > IIUC, the overall balance is divided into two parts: slot to TM and > > task > > > to slot. > > > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > > > > 2. Task to slot is guaranteed by the slot pool in JM > > > > > > > > These two are completely independent, what are the benefits of unifying > > > > these two into one option? Also, do we want to share the same > > > > option between SlotPool in JM and SlotManager in RM? This sounds a bit > > > > strange. > > > > > > Your understanding is totally right, the balance needs 2 parts: slot to > > TM > > > and task to slot. > > > > > > As I understand, the following are benefits of unifying them into one > > > option: > > > > > > - Flink users don't care about these principles inside of flink, they > > don't > > > know these 2 parts. > > > - If flink provides 2 options, flink users need to set 2 options for > > their > > > job. > > > - If one option is missed, the final result may not be good. (Users may > > > have questions when using) > > > - If flink just provides 1 option, enabling one option is enough. (Reduce > > > the probability of misconfiguration) > > > > > > Also, Flink’s options are user-oriented. Each option
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Zhu, Thanks for your clarification! I misunderstood before, it's clear now. Best, Rui On Tue, Oct 10, 2023 at 6:17 PM Zhu Zhu wrote: > Hi Rui, > > Not sure if I understand your question correctly. The two modes are not > the same: > {taskmanager.load-balance.mode: Slots} = {cluster.evenly-spread-out-slots: > true, slot.sharing-strategy: LOCAL_INPUT_PREFERRED} > {taskmanager.load-balance.mode: Tasks} = {cluster.evenly-spread-out-slots: > true, slot.sharing-strategy: TASK_BALANCED_PREFERRED} > > Thanks, > Zhu > > Rui Fan <1996fan...@gmail.com> 于2023年10月10日周二 10:27写道: > >> Hi Zhu, >> >> Thanks for your feedback! >> >> >> 2. When it's set to Tasks, how to assign slots to TM? >> > It's option2 at the moment. However, I think it's just implementation >> > details and can be changed/refined later. >> > >> > As you mentioned in another comment, 'taskmanager.load-balance.mode' is >> > a user oriented configuration. The goal is to achieve load balance, >> while >> > the load can be defined as allocated slots or assigned tasks. >> > The 'Tasks' mode, just the same as what is proposed in the FLIP, >> currently >> > use the mechanism of 'cluster.evenly-spread-out-slots' to help to >> achieve >> > balanced number of tasks. It's not perfect, but has acceptable >> effectiveness >> > and lower implementation complexity. >> > >> > The 'Slots' mode is needed for compatible reasons. Users that are >> satisfied >> > with the current ability of 'cluster.evenly-spread-out-slots' can >> continue >> > using it after the config 'cluster.evenly-spread-out-slots' is >> deprecated. >> >> IIUC, the 'Slots' mode is needed for compatibility with >> 'cluster.evenly-spread-out-slots'. >> The reason I ask this question is: if the behavior and logic of 'Slots' >> and >> 'Tasks' are exactly the same, it feels a bit strange to define two >> enumerations. >> And it may cause confusion to users. >> >> If they are totally the same, how about combining them to SlotsAndTasks? >> It can be compatible with 'cluster.evenly-spread-out-slots', and avoid >> the redundant enum. Of course, if the name(SlotsAndTasks) is ugly, >> we can discuss it. The core idea is combining them. >> >> WDYT? >> >> Best, >> Rui >> >> On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu wrote: >> >>> Thanks for the response, Rui and Yuepeng. >>> >>> >> Rui >>> > 1. The default value is None, right? >>> Exactly. >>> >>> > 2. When it's set to Tasks, how to assign slots to TM? >>> It's option2 at the moment. However, I think it's just implementation >>> details and can be changed/refined later. >>> >>> As you mentioned in another comment, 'taskmanager.load-balance.mode' is >>> a user oriented configuration. The goal is to achieve load balance, >>> while >>> the load can be defined as allocated slots or assigned tasks. >>> The 'Tasks' mode, just the same as what is proposed in the FLIP, >>> currently >>> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve >>> balanced number of tasks. It's not perfect, but has acceptable >>> effectiveness >>> and lower implementation complexity. >>> >>> The 'Slots' mode is needed for compatible reasons. Users that are >>> satisfied >>> with the current ability of 'cluster.evenly-spread-out-slots' can >>> continue >>> using it after the config 'cluster.evenly-spread-out-slots' is >>> deprecated. >>> >>> >>> >> Yuepeng >>> I think what users want is load balance. The combination is >>> implementation >>> details and should be transparent to users. >>> >>> Meanwhile, I think locality does not entirely conflict with load >>> balance. In fact, >>> they should be both considered when assigning tasks. Usually, state >>> locality >>> should have the highest priority, and input locality can also be taken >>> care >>> of when trying to balance tasks to slots and TMs. We can see that the >>> most >>> important input locality, i.e. forward, is always covered in this FLIP >>> when >>> computing slot sharing groups. It can be further optimized if we find it >>> problematic. >>> >>> Thanks, >>> Zhu >>> >>> Yangze Guo 于2023年10月8日周日 13:53写道: >>> Thanks for the updates, Rui. It does seem challenging to ensure evenness in slot deployment unless we introduce batch slot requests in SlotPool. However, one possibility is to add a delay of around 50ms during the SlotPool's resource requirement declaration to the ResourceManager, similar to the checkResourceRequirementsWithDelay in the SlotManager. In most cases, this delay would allow the SlotManager to see all resource requirements, then it can allocate the slot more evenly. As a side effect, it could also significantly reduce the number of RPC messages to the ResourceManager, which could become a single-point bottleneck in OLAP scenarios. WDYT? Best, Yangze Guo On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi Yangze, > > Thanks for your quick response! > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Rui, Not sure if I understand your question correctly. The two modes are not the same: {taskmanager.load-balance.mode: Slots} = {cluster.evenly-spread-out-slots: true, slot.sharing-strategy: LOCAL_INPUT_PREFERRED} {taskmanager.load-balance.mode: Tasks} = {cluster.evenly-spread-out-slots: true, slot.sharing-strategy: TASK_BALANCED_PREFERRED} Thanks, Zhu Rui Fan <1996fan...@gmail.com> 于2023年10月10日周二 10:27写道: > Hi Zhu, > > Thanks for your feedback! > > >> 2. When it's set to Tasks, how to assign slots to TM? > > It's option2 at the moment. However, I think it's just implementation > > details and can be changed/refined later. > > > > As you mentioned in another comment, 'taskmanager.load-balance.mode' is > > a user oriented configuration. The goal is to achieve load balance, > while > > the load can be defined as allocated slots or assigned tasks. > > The 'Tasks' mode, just the same as what is proposed in the FLIP, > currently > > use the mechanism of 'cluster.evenly-spread-out-slots' to help to > achieve > > balanced number of tasks. It's not perfect, but has acceptable > effectiveness > > and lower implementation complexity. > > > > The 'Slots' mode is needed for compatible reasons. Users that are > satisfied > > with the current ability of 'cluster.evenly-spread-out-slots' can > continue > > using it after the config 'cluster.evenly-spread-out-slots' is > deprecated. > > IIUC, the 'Slots' mode is needed for compatibility with > 'cluster.evenly-spread-out-slots'. > The reason I ask this question is: if the behavior and logic of 'Slots' > and > 'Tasks' are exactly the same, it feels a bit strange to define two > enumerations. > And it may cause confusion to users. > > If they are totally the same, how about combining them to SlotsAndTasks? > It can be compatible with 'cluster.evenly-spread-out-slots', and avoid > the redundant enum. Of course, if the name(SlotsAndTasks) is ugly, > we can discuss it. The core idea is combining them. > > WDYT? > > Best, > Rui > > On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu wrote: > >> Thanks for the response, Rui and Yuepeng. >> >> >> Rui >> > 1. The default value is None, right? >> Exactly. >> >> > 2. When it's set to Tasks, how to assign slots to TM? >> It's option2 at the moment. However, I think it's just implementation >> details and can be changed/refined later. >> >> As you mentioned in another comment, 'taskmanager.load-balance.mode' is >> a user oriented configuration. The goal is to achieve load balance, while >> the load can be defined as allocated slots or assigned tasks. >> The 'Tasks' mode, just the same as what is proposed in the FLIP, currently >> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve >> balanced number of tasks. It's not perfect, but has acceptable >> effectiveness >> and lower implementation complexity. >> >> The 'Slots' mode is needed for compatible reasons. Users that are >> satisfied >> with the current ability of 'cluster.evenly-spread-out-slots' can continue >> using it after the config 'cluster.evenly-spread-out-slots' is deprecated. >> >> >> >> Yuepeng >> I think what users want is load balance. The combination is implementation >> details and should be transparent to users. >> >> Meanwhile, I think locality does not entirely conflict with load balance. >> In fact, >> they should be both considered when assigning tasks. Usually, state >> locality >> should have the highest priority, and input locality can also be taken >> care >> of when trying to balance tasks to slots and TMs. We can see that the most >> important input locality, i.e. forward, is always covered in this FLIP >> when >> computing slot sharing groups. It can be further optimized if we find it >> problematic. >> >> Thanks, >> Zhu >> >> Yangze Guo 于2023年10月8日周日 13:53写道: >> >>> Thanks for the updates, Rui. >>> >>> It does seem challenging to ensure evenness in slot deployment unless >>> we introduce batch slot requests in SlotPool. However, one possibility >>> is to add a delay of around 50ms during the SlotPool's resource >>> requirement declaration to the ResourceManager, similar to the >>> checkResourceRequirementsWithDelay in the SlotManager. In most cases, >>> this delay would allow the SlotManager to see all resource >>> requirements, then it can allocate the slot more evenly. As a side >>> effect, it could also significantly reduce the number of RPC messages >>> to the ResourceManager, which could become a single-point bottleneck >>> in OLAP scenarios. WDYT? >>> >>> Best, >>> Yangze Guo >>> >>> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: >>> > >>> > Hi Yangze, >>> > >>> > Thanks for your quick response! >>> > >>> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found >>> > it isn't clear. The root cause of introducing the waiting mechanism is >>> > that the slot requests are sent from JobMaster to SlotPool is >>> > one by one instead of one whole batch. I have rewritten the 2.2.2 part, >>> > please
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Zhu, Thanks for your feedback! >> 2. When it's set to Tasks, how to assign slots to TM? > It's option2 at the moment. However, I think it's just implementation > details and can be changed/refined later. > > As you mentioned in another comment, 'taskmanager.load-balance.mode' is > a user oriented configuration. The goal is to achieve load balance, while > the load can be defined as allocated slots or assigned tasks. > The 'Tasks' mode, just the same as what is proposed in the FLIP, currently > use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve > balanced number of tasks. It's not perfect, but has acceptable effectiveness > and lower implementation complexity. > > The 'Slots' mode is needed for compatible reasons. Users that are satisfied > with the current ability of 'cluster.evenly-spread-out-slots' can continue > using it after the config 'cluster.evenly-spread-out-slots' is deprecated. IIUC, the 'Slots' mode is needed for compatibility with 'cluster.evenly-spread-out-slots'. The reason I ask this question is: if the behavior and logic of 'Slots' and 'Tasks' are exactly the same, it feels a bit strange to define two enumerations. And it may cause confusion to users. If they are totally the same, how about combining them to SlotsAndTasks? It can be compatible with 'cluster.evenly-spread-out-slots', and avoid the redundant enum. Of course, if the name(SlotsAndTasks) is ugly, we can discuss it. The core idea is combining them. WDYT? Best, Rui On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu wrote: > Thanks for the response, Rui and Yuepeng. > > >> Rui > > 1. The default value is None, right? > Exactly. > > > 2. When it's set to Tasks, how to assign slots to TM? > It's option2 at the moment. However, I think it's just implementation > details and can be changed/refined later. > > As you mentioned in another comment, 'taskmanager.load-balance.mode' is > a user oriented configuration. The goal is to achieve load balance, while > the load can be defined as allocated slots or assigned tasks. > The 'Tasks' mode, just the same as what is proposed in the FLIP, currently > use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve > balanced number of tasks. It's not perfect, but has acceptable > effectiveness > and lower implementation complexity. > > The 'Slots' mode is needed for compatible reasons. Users that are satisfied > with the current ability of 'cluster.evenly-spread-out-slots' can continue > using it after the config 'cluster.evenly-spread-out-slots' is deprecated. > > > >> Yuepeng > I think what users want is load balance. The combination is implementation > details and should be transparent to users. > > Meanwhile, I think locality does not entirely conflict with load balance. > In fact, > they should be both considered when assigning tasks. Usually, state > locality > should have the highest priority, and input locality can also be taken care > of when trying to balance tasks to slots and TMs. We can see that the most > important input locality, i.e. forward, is always covered in this FLIP when > computing slot sharing groups. It can be further optimized if we find it > problematic. > > Thanks, > Zhu > > Yangze Guo 于2023年10月8日周日 13:53写道: > >> Thanks for the updates, Rui. >> >> It does seem challenging to ensure evenness in slot deployment unless >> we introduce batch slot requests in SlotPool. However, one possibility >> is to add a delay of around 50ms during the SlotPool's resource >> requirement declaration to the ResourceManager, similar to the >> checkResourceRequirementsWithDelay in the SlotManager. In most cases, >> this delay would allow the SlotManager to see all resource >> requirements, then it can allocate the slot more evenly. As a side >> effect, it could also significantly reduce the number of RPC messages >> to the ResourceManager, which could become a single-point bottleneck >> in OLAP scenarios. WDYT? >> >> Best, >> Yangze Guo >> >> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: >> > >> > Hi Yangze, >> > >> > Thanks for your quick response! >> > >> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found >> > it isn't clear. The root cause of introducing the waiting mechanism is >> > that the slot requests are sent from JobMaster to SlotPool is >> > one by one instead of one whole batch. I have rewritten the 2.2.2 part, >> > please read it again in your free time. >> > >> > [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism >> > >> > Best, >> > Rui >> > >> > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo wrote: >> >> >> >> Thanks for the clarification, Rui. >> >> >> >> I believe the root cause of this issue is that in the current >> >> DefaultResourceAllocationStrategy, slot allocation begins before the >> >> decision to PendingTaskManagers requesting is made. That can be fixed >> >> within the
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks for the response, Rui and Yuepeng. >> Rui > 1. The default value is None, right? Exactly. > 2. When it's set to Tasks, how to assign slots to TM? It's option2 at the moment. However, I think it's just implementation details and can be changed/refined later. As you mentioned in another comment, 'taskmanager.load-balance.mode' is a user oriented configuration. The goal is to achieve load balance, while the load can be defined as allocated slots or assigned tasks. The 'Tasks' mode, just the same as what is proposed in the FLIP, currently use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve balanced number of tasks. It's not perfect, but has acceptable effectiveness and lower implementation complexity. The 'Slots' mode is needed for compatible reasons. Users that are satisfied with the current ability of 'cluster.evenly-spread-out-slots' can continue using it after the config 'cluster.evenly-spread-out-slots' is deprecated. >> Yuepeng I think what users want is load balance. The combination is implementation details and should be transparent to users. Meanwhile, I think locality does not entirely conflict with load balance. In fact, they should be both considered when assigning tasks. Usually, state locality should have the highest priority, and input locality can also be taken care of when trying to balance tasks to slots and TMs. We can see that the most important input locality, i.e. forward, is always covered in this FLIP when computing slot sharing groups. It can be further optimized if we find it problematic. Thanks, Zhu Yangze Guo 于2023年10月8日周日 13:53写道: > Thanks for the updates, Rui. > > It does seem challenging to ensure evenness in slot deployment unless > we introduce batch slot requests in SlotPool. However, one possibility > is to add a delay of around 50ms during the SlotPool's resource > requirement declaration to the ResourceManager, similar to the > checkResourceRequirementsWithDelay in the SlotManager. In most cases, > this delay would allow the SlotManager to see all resource > requirements, then it can allocate the slot more evenly. As a side > effect, it could also significantly reduce the number of RPC messages > to the ResourceManager, which could become a single-point bottleneck > in OLAP scenarios. WDYT? > > Best, > Yangze Guo > > On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > Hi Yangze, > > > > Thanks for your quick response! > > > > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found > > it isn't clear. The root cause of introducing the waiting mechanism is > > that the slot requests are sent from JobMaster to SlotPool is > > one by one instead of one whole batch. I have rewritten the 2.2.2 part, > > please read it again in your free time. > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism > > > > Best, > > Rui > > > > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo wrote: > >> > >> Thanks for the clarification, Rui. > >> > >> I believe the root cause of this issue is that in the current > >> DefaultResourceAllocationStrategy, slot allocation begins before the > >> decision to PendingTaskManagers requesting is made. That can be fixed > >> within the strategy without introducing another waiting mechanism. I > >> think it would be better to address this issue within the scope of > >> this FLIP. However, I don't have a strong opinion on it, it depends on > >> your bandwidth. > >> > >> > >> Best, > >> Yangze Guo > >> > >> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote: > >> > > >> > Hi Yangze, > >> > > >> > > 2. From my understanding, if user enable the > >> > > cluster.evenly-spread-out-slots, > >> > > LeastUtilizationResourceMatchingStrategy will be used to determine > the > >> > > slot distribution and the slot allocation in the three TM will be > >> > > (taskmanager.numberOfTaskSlots=3): > >> > > TM1: 3 slot > >> > > TM2: 2 slot > >> > > TM3: 2 slot > >> > > >> > When all tms are ready in advance, the three TM will be: > >> > TM1: 3 slot > >> > TM2: 2 slot > >> > TM3: 2 slot > >> > > >> > For application mode, the resource manager doesn't apply for > >> > TM in advance, and slots aren't enough before the third TM is ready. > >> > So all slots of the second TM will be used up. The three TM will be: > >> > TM1: 3 slot > >> > TM2: 3 slot > >> > TM3: 1 slot > >> > > >> > That's why the FLIP add some notes: > >> > > >> > All free slots are in the last TM, because ResourceManager doesn’t > have the waiting mechanism, and it just requests 7 slots for this JobMaster. > >> > Why is it acceptable? > >> > > >> > If we just add the waiting mechanism to JobMaster but not in > ResourceManager, all free slots will be in the last TM. All slots of other > TMs are offered to JM. > >> > That is, only one TM may have fewer tasks than the other TMs. The > difference between the
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks for the updates, Rui. It does seem challenging to ensure evenness in slot deployment unless we introduce batch slot requests in SlotPool. However, one possibility is to add a delay of around 50ms during the SlotPool's resource requirement declaration to the ResourceManager, similar to the checkResourceRequirementsWithDelay in the SlotManager. In most cases, this delay would allow the SlotManager to see all resource requirements, then it can allocate the slot more evenly. As a side effect, it could also significantly reduce the number of RPC messages to the ResourceManager, which could become a single-point bottleneck in OLAP scenarios. WDYT? Best, Yangze Guo On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi Yangze, > > Thanks for your quick response! > > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found > it isn't clear. The root cause of introducing the waiting mechanism is > that the slot requests are sent from JobMaster to SlotPool is > one by one instead of one whole batch. I have rewritten the 2.2.2 part, > please read it again in your free time. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism > > Best, > Rui > > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo wrote: >> >> Thanks for the clarification, Rui. >> >> I believe the root cause of this issue is that in the current >> DefaultResourceAllocationStrategy, slot allocation begins before the >> decision to PendingTaskManagers requesting is made. That can be fixed >> within the strategy without introducing another waiting mechanism. I >> think it would be better to address this issue within the scope of >> this FLIP. However, I don't have a strong opinion on it, it depends on >> your bandwidth. >> >> >> Best, >> Yangze Guo >> >> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote: >> > >> > Hi Yangze, >> > >> > > 2. From my understanding, if user enable the >> > > cluster.evenly-spread-out-slots, >> > > LeastUtilizationResourceMatchingStrategy will be used to determine the >> > > slot distribution and the slot allocation in the three TM will be >> > > (taskmanager.numberOfTaskSlots=3): >> > > TM1: 3 slot >> > > TM2: 2 slot >> > > TM3: 2 slot >> > >> > When all tms are ready in advance, the three TM will be: >> > TM1: 3 slot >> > TM2: 2 slot >> > TM3: 2 slot >> > >> > For application mode, the resource manager doesn't apply for >> > TM in advance, and slots aren't enough before the third TM is ready. >> > So all slots of the second TM will be used up. The three TM will be: >> > TM1: 3 slot >> > TM2: 3 slot >> > TM3: 1 slot >> > >> > That's why the FLIP add some notes: >> > >> > All free slots are in the last TM, because ResourceManager doesn’t have >> > the waiting mechanism, and it just requests 7 slots for this JobMaster. >> > Why is it acceptable? >> > >> > If we just add the waiting mechanism to JobMaster but not in >> > ResourceManager, all free slots will be in the last TM. All slots of other >> > TMs are offered to JM. >> > That is, only one TM may have fewer tasks than the other TMs. The >> > difference between the number of tasks of other TMs is at most 1.So When p >> > >> slotsPerTM, the problem can be ignored. >> > We can also suggest users, in cases that p is small, it's better to >> > configure slotsPerTM to 1, or let p % slotsPerTM == 0. >> > >> > Please correct me if my understanding is wrong, thanks~ >> > >> > Best, >> > Rui >> > >> > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo wrote: >> >> >> >> Hi, Rui, >> >> >> >> 1. With the current mechanism, when physical slots are offered from >> >> TM, the JobMaster will start deploying tasks and synchronizing their >> >> states. With the addition of the waiting mechanism, IIUC, the >> >> JobMaster will deploy and synchronize the states of all tasks only >> >> after all resources are available. The task deployment and state >> >> synchronization both occupy the JobMaster's RPC main thread. In >> >> complex jobs with a lot of tasks, this waiting mechanism may increase >> >> the pressure on the JobMaster and increase the end-to-end job >> >> deployment time. >> >> >> >> 2. From my understanding, if user enable the >> >> cluster.evenly-spread-out-slots, >> >> LeastUtilizationResourceMatchingStrategy will be used to determine the >> >> slot distribution and the slot allocation in the three TM will be >> >> (taskmanager.numberOfTaskSlots=3): >> >> TM1: 3 slot >> >> TM2: 2 slot >> >> TM3: 2 slot >> >> >> >> Best, >> >> Yangze Guo >> >> >> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: >> >> > >> >> > Hi Shammon, >> >> > >> >> > Thanks for your feedback as well! >> >> > >> >> > > IIUC, the overall balance is divided into two parts: slot to TM and >> >> > > task >> >> > to slot. >> >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager >> >> > > 2. Task to slot is
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Yangze, Thanks for your quick response! Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found it isn't clear. The root cause of introducing the waiting mechanism is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch. I have rewritten the 2.2.2 part, please read it again in your free time. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism Best, Rui On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo wrote: > Thanks for the clarification, Rui. > > I believe the root cause of this issue is that in the current > DefaultResourceAllocationStrategy, slot allocation begins before the > decision to PendingTaskManagers requesting is made. That can be fixed > within the strategy without introducing another waiting mechanism. I > think it would be better to address this issue within the scope of > this FLIP. However, I don't have a strong opinion on it, it depends on > your bandwidth. > > > Best, > Yangze Guo > > On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > Hi Yangze, > > > > > 2. From my understanding, if user enable the > > > cluster.evenly-spread-out-slots, > > > LeastUtilizationResourceMatchingStrategy will be used to determine the > > > slot distribution and the slot allocation in the three TM will be > > > (taskmanager.numberOfTaskSlots=3): > > > TM1: 3 slot > > > TM2: 2 slot > > > TM3: 2 slot > > > > When all tms are ready in advance, the three TM will be: > > TM1: 3 slot > > TM2: 2 slot > > TM3: 2 slot > > > > For application mode, the resource manager doesn't apply for > > TM in advance, and slots aren't enough before the third TM is ready. > > So all slots of the second TM will be used up. The three TM will be: > > TM1: 3 slot > > TM2: 3 slot > > TM3: 1 slot > > > > That's why the FLIP add some notes: > > > > All free slots are in the last TM, because ResourceManager doesn’t have > the waiting mechanism, and it just requests 7 slots for this JobMaster. > > Why is it acceptable? > > > > If we just add the waiting mechanism to JobMaster but not in > ResourceManager, all free slots will be in the last TM. All slots of other > TMs are offered to JM. > > That is, only one TM may have fewer tasks than the other TMs. The > difference between the number of tasks of other TMs is at most 1.So When p > >> slotsPerTM, the problem can be ignored. > > We can also suggest users, in cases that p is small, it's better to > configure slotsPerTM to 1, or let p % slotsPerTM == 0. > > > > Please correct me if my understanding is wrong, thanks~ > > > > Best, > > Rui > > > > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo wrote: > >> > >> Hi, Rui, > >> > >> 1. With the current mechanism, when physical slots are offered from > >> TM, the JobMaster will start deploying tasks and synchronizing their > >> states. With the addition of the waiting mechanism, IIUC, the > >> JobMaster will deploy and synchronize the states of all tasks only > >> after all resources are available. The task deployment and state > >> synchronization both occupy the JobMaster's RPC main thread. In > >> complex jobs with a lot of tasks, this waiting mechanism may increase > >> the pressure on the JobMaster and increase the end-to-end job > >> deployment time. > >> > >> 2. From my understanding, if user enable the > >> cluster.evenly-spread-out-slots, > >> LeastUtilizationResourceMatchingStrategy will be used to determine the > >> slot distribution and the slot allocation in the three TM will be > >> (taskmanager.numberOfTaskSlots=3): > >> TM1: 3 slot > >> TM2: 2 slot > >> TM3: 2 slot > >> > >> Best, > >> Yangze Guo > >> > >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: > >> > > >> > Hi Shammon, > >> > > >> > Thanks for your feedback as well! > >> > > >> > > IIUC, the overall balance is divided into two parts: slot to TM and > task > >> > to slot. > >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > >> > > 2. Task to slot is guaranteed by the slot pool in JM > >> > > > >> > > These two are completely independent, what are the benefits of > unifying > >> > > these two into one option? Also, do we want to share the same > >> > > option between SlotPool in JM and SlotManager in RM? This sounds a > bit > >> > > strange. > >> > > >> > Your understanding is totally right, the balance needs 2 parts: slot > to TM > >> > and task to slot. > >> > > >> > As I understand, the following are benefits of unifying them into one > >> > option: > >> > > >> > - Flink users don't care about these principles inside of flink, they > don't > >> > know these 2 parts. > >> > - If flink provides 2 options, flink users need to set 2 options for > their > >> > job. > >> > - If one option is missed, the final result may not be good. (Users > may > >> > have questions when using) > >> > - If flink just provides 1 option, enabling one option is
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks for the clarification, Rui. I believe the root cause of this issue is that in the current DefaultResourceAllocationStrategy, slot allocation begins before the decision to PendingTaskManagers requesting is made. That can be fixed within the strategy without introducing another waiting mechanism. I think it would be better to address this issue within the scope of this FLIP. However, I don't have a strong opinion on it, it depends on your bandwidth. Best, Yangze Guo On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi Yangze, > > > 2. From my understanding, if user enable the > > cluster.evenly-spread-out-slots, > > LeastUtilizationResourceMatchingStrategy will be used to determine the > > slot distribution and the slot allocation in the three TM will be > > (taskmanager.numberOfTaskSlots=3): > > TM1: 3 slot > > TM2: 2 slot > > TM3: 2 slot > > When all tms are ready in advance, the three TM will be: > TM1: 3 slot > TM2: 2 slot > TM3: 2 slot > > For application mode, the resource manager doesn't apply for > TM in advance, and slots aren't enough before the third TM is ready. > So all slots of the second TM will be used up. The three TM will be: > TM1: 3 slot > TM2: 3 slot > TM3: 1 slot > > That's why the FLIP add some notes: > > All free slots are in the last TM, because ResourceManager doesn’t have the > waiting mechanism, and it just requests 7 slots for this JobMaster. > Why is it acceptable? > > If we just add the waiting mechanism to JobMaster but not in ResourceManager, > all free slots will be in the last TM. All slots of other TMs are offered to > JM. > That is, only one TM may have fewer tasks than the other TMs. The difference > between the number of tasks of other TMs is at most 1.So When p >> > slotsPerTM, the problem can be ignored. > We can also suggest users, in cases that p is small, it's better to configure > slotsPerTM to 1, or let p % slotsPerTM == 0. > > Please correct me if my understanding is wrong, thanks~ > > Best, > Rui > > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo wrote: >> >> Hi, Rui, >> >> 1. With the current mechanism, when physical slots are offered from >> TM, the JobMaster will start deploying tasks and synchronizing their >> states. With the addition of the waiting mechanism, IIUC, the >> JobMaster will deploy and synchronize the states of all tasks only >> after all resources are available. The task deployment and state >> synchronization both occupy the JobMaster's RPC main thread. In >> complex jobs with a lot of tasks, this waiting mechanism may increase >> the pressure on the JobMaster and increase the end-to-end job >> deployment time. >> >> 2. From my understanding, if user enable the >> cluster.evenly-spread-out-slots, >> LeastUtilizationResourceMatchingStrategy will be used to determine the >> slot distribution and the slot allocation in the three TM will be >> (taskmanager.numberOfTaskSlots=3): >> TM1: 3 slot >> TM2: 2 slot >> TM3: 2 slot >> >> Best, >> Yangze Guo >> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: >> > >> > Hi Shammon, >> > >> > Thanks for your feedback as well! >> > >> > > IIUC, the overall balance is divided into two parts: slot to TM and task >> > to slot. >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager >> > > 2. Task to slot is guaranteed by the slot pool in JM >> > > >> > > These two are completely independent, what are the benefits of unifying >> > > these two into one option? Also, do we want to share the same >> > > option between SlotPool in JM and SlotManager in RM? This sounds a bit >> > > strange. >> > >> > Your understanding is totally right, the balance needs 2 parts: slot to TM >> > and task to slot. >> > >> > As I understand, the following are benefits of unifying them into one >> > option: >> > >> > - Flink users don't care about these principles inside of flink, they don't >> > know these 2 parts. >> > - If flink provides 2 options, flink users need to set 2 options for their >> > job. >> > - If one option is missed, the final result may not be good. (Users may >> > have questions when using) >> > - If flink just provides 1 option, enabling one option is enough. (Reduce >> > the probability of misconfiguration) >> > >> > Also, Flink’s options are user-oriented. Each option represents a switch or >> > parameter of a feature. >> > A feature may be composed of multiple components inside Flink. >> > It might be better to keep only one switch per feature. >> > >> > Actually, the cluster.evenly-spread-out-slots option is used between >> > SlotPool in JM and SlotManager in RM. 2 components to ensure >> > this feature works well. >> > >> > Please correct me if my understanding is wrong, >> > and looking forward to your feedback, thanks! >> > >> > Best, >> > Rui >> > >> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: >> > >> > > Hi Yangze, >> > > >> > > Thanks for your feedback! >> > > >> > > > 1. Is it possible for the SlotPool to
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Yangze, > 2. From my understanding, if user enable the > cluster.evenly-spread-out-slots, > LeastUtilizationResourceMatchingStrategy will be used to determine the > slot distribution and the slot allocation in the three TM will be > (taskmanager.numberOfTaskSlots=3): > TM1: 3 slot > TM2: 2 slot > TM3: 2 slot When all tms are ready in advance, the three TM will be: TM1: 3 slot TM2: 2 slot TM3: 2 slot For application mode, the resource manager doesn't apply for TM in advance, and slots aren't enough before the third TM is ready. So all slots of the second TM will be used up. The three TM will be: TM1: 3 slot TM2: 3 slot TM3: 1 slot That's why the FLIP add some notes: - All *free* slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster. - Why is it acceptable? - - If we just add the waiting mechanism to JobMaster but not in ResourceManager, all *free* slots will be in the last TM. All slots of other TMs are offered to JM. - That is, only one TM may have fewer tasks than the other TMs. The difference between the number of tasks of other TMs is at most 1.So When *p* >> *slotsPerTM*, the problem can be ignored. - We can also suggest users, in cases that p is small, it's better to configure *slotsPerTM* to 1, or let *p % slotsPerTM* == 0. Please correct me if my understanding is wrong, thanks~ Best, Rui On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo wrote: > Hi, Rui, > > 1. With the current mechanism, when physical slots are offered from > TM, the JobMaster will start deploying tasks and synchronizing their > states. With the addition of the waiting mechanism, IIUC, the > JobMaster will deploy and synchronize the states of all tasks only > after all resources are available. The task deployment and state > synchronization both occupy the JobMaster's RPC main thread. In > complex jobs with a lot of tasks, this waiting mechanism may increase > the pressure on the JobMaster and increase the end-to-end job > deployment time. > > 2. From my understanding, if user enable the > cluster.evenly-spread-out-slots, > LeastUtilizationResourceMatchingStrategy will be used to determine the > slot distribution and the slot allocation in the three TM will be > (taskmanager.numberOfTaskSlots=3): > TM1: 3 slot > TM2: 2 slot > TM3: 2 slot > > Best, > Yangze Guo > > On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > Hi Shammon, > > > > Thanks for your feedback as well! > > > > > IIUC, the overall balance is divided into two parts: slot to TM and > task > > to slot. > > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > > > 2. Task to slot is guaranteed by the slot pool in JM > > > > > > These two are completely independent, what are the benefits of unifying > > > these two into one option? Also, do we want to share the same > > > option between SlotPool in JM and SlotManager in RM? This sounds a bit > > > strange. > > > > Your understanding is totally right, the balance needs 2 parts: slot to > TM > > and task to slot. > > > > As I understand, the following are benefits of unifying them into one > > option: > > > > - Flink users don't care about these principles inside of flink, they > don't > > know these 2 parts. > > - If flink provides 2 options, flink users need to set 2 options for > their > > job. > > - If one option is missed, the final result may not be good. (Users may > > have questions when using) > > - If flink just provides 1 option, enabling one option is enough. (Reduce > > the probability of misconfiguration) > > > > Also, Flink’s options are user-oriented. Each option represents a switch > or > > parameter of a feature. > > A feature may be composed of multiple components inside Flink. > > It might be better to keep only one switch per feature. > > > > Actually, the cluster.evenly-spread-out-slots option is used between > > SlotPool in JM and SlotManager in RM. 2 components to ensure > > this feature works well. > > > > Please correct me if my understanding is wrong, > > and looking forward to your feedback, thanks! > > > > Best, > > Rui > > > > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > > Hi Yangze, > > > > > > Thanks for your feedback! > > > > > > > 1. Is it possible for the SlotPool to get the slot allocation results > > > > from the SlotManager in advance instead of waiting for the actual > > > > physical slots to be registered, and perform pre-allocation? The > > > > benefit of doing this is to make the task deployment process > smoother, > > > > especially when there are a large number of tasks in the job. > > > > > > Could you elaborate on that? I didn't understand what's the benefit and > > > smoother. > > > > > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in > > > > example 2 of section 2.2.3 can be resolved. Do I understand it > > > > correctly? > > > > > > The
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Shammon, IIUC, you want more flexibility in controlling the two-phase strategy, right? > I want this because we would like to add a new slot to TM strategy such as SLOTS_NUM in the future for OLAP to improve the performance for olap jobs, which will use TASKS strategy for task to slot. cc Guoyangze Actually, one option can achieve your requirement, it can control two-phase. We can add a new enum for this option, and it will use the new strategy for slot to TM, and use task_balanced strategy for task to slot. Of course, I think 2 options is more flexible. If the strategy is too many, 2 options are easy for users. Also, I have a question: What is SLOTS_NUM strategy? Isn't it slot balanced at tm level? I want to check whether it's similar to `cluster.evenly-spread-out-slots`. If they are similar or same, the strategy isn't too many, and one option may be enough. Best, Rui On Sat, Oct 7, 2023 at 11:29 AM Shammon FY wrote: > Thanks Rui, I check the codes and you're right. > > As you described above, the entire process is actually two independent > steps from slot to TM and task to slot. Currenlty we use option > `cluster.evenly-spread-out-slots` for both of them. Can we provide > different options for the two steps, such as ANY/SLOTS for RM and ANY/TASKS > for slot pool? > > I want this because we would like to add a new slot to TM strategy such as > SLOTS_NUM in the future for OLAP to improve the performance for olap jobs, > which will use TASKS strategy for task to slot. cc Guoyangze > > Best, > Shammon FY > > On Fri, Oct 6, 2023 at 6:19 PM xiangyu feng wrote: > >> Thanks Yuepeng and Rui for driving this Discussion. >> >> Internally when we try to use Flink 1.17.1 in production, we are also >> suffering from the unbalanced task distribution problem for jobs with high >> qps and complex dag. So +1 for the overall proposal. >> >> Some questions about the details: >> >> 1, About the waiting mechanism: Will the waiting mechanism happen only in >> the second level 'assigning slots to TM'? IIUC, the first level >> 'assigning >> Tasks to Slots' needs only the asynchronous slot result from slotpool. >> >> 2, About the slot LoadingWeight: it is reasonable to use the number of >> tasks by default in the beginning, but it would be better if this could be >> easily extended in future to distinguish between CPU-intensive and >> IO-intensive workloads. In some cases, TMs may have IO bottlenecks but >> others have CPU bottlenecks. >> >> Regards, >> Xiangyu >> >> >> Yuepeng Pan 于2023年10月5日周四 18:34写道: >> >> > Hi, Zhu Zhu, >> > >> > Thanks for your feedback! >> > >> > > I think we can introduce a new config option >> > > `taskmanager.load-balance.mode`, >> > > which accepts "None"/"Slots"/"Tasks". >> `cluster.evenly-spread-out-slots` >> > > can be superseded by the "Slots" mode and get deprecated. In the >> future >> > > it can support more mode, e.g. "CpuCores", to work better for jobs >> with >> > > fine-grained resources. The proposed config option >> > > `slot.request.max-interval` >> > > then can be renamed to >> > `taskmanager.load-balance.request-stablizing-timeout` >> > > to show its relation with the feature. The proposed >> > `slot.sharing-strategy` >> > > is not needed, because the configured "Tasks" mode will do the work. >> > >> > The new proposed configuration option sounds good to me. >> > >> > I have a small question, If we set our configuration value to 'Tasks,' >> it >> > will initiate two processes: balancing the allocation of task >> quantities at >> > the slot level and balancing the number of tasks across TaskManagers >> (TMs). >> > Alternatively, if we configure it as 'Slots,' the system will employ the >> > LocalPreferred allocation policy (which is the default) when assigning >> > tasks to slots, and it will ensure that the number of slots used across >> TMs >> > is balanced. >> > Does this configuration essentially combine a balanced selection >> strategy >> > across two dimensions into fixed configuration items, right? >> > >> > I would appreciate it if you could correct me if I've made any errors. >> > >> > Best, >> > Yuepeng. >> > >> >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks Rui, I check the codes and you're right. As you described above, the entire process is actually two independent steps from slot to TM and task to slot. Currenlty we use option `cluster.evenly-spread-out-slots` for both of them. Can we provide different options for the two steps, such as ANY/SLOTS for RM and ANY/TASKS for slot pool? I want this because we would like to add a new slot to TM strategy such as SLOTS_NUM in the future for OLAP to improve the performance for olap jobs, which will use TASKS strategy for task to slot. cc Guoyangze Best, Shammon FY On Fri, Oct 6, 2023 at 6:19 PM xiangyu feng wrote: > Thanks Yuepeng and Rui for driving this Discussion. > > Internally when we try to use Flink 1.17.1 in production, we are also > suffering from the unbalanced task distribution problem for jobs with high > qps and complex dag. So +1 for the overall proposal. > > Some questions about the details: > > 1, About the waiting mechanism: Will the waiting mechanism happen only in > the second level 'assigning slots to TM'? IIUC, the first level 'assigning > Tasks to Slots' needs only the asynchronous slot result from slotpool. > > 2, About the slot LoadingWeight: it is reasonable to use the number of > tasks by default in the beginning, but it would be better if this could be > easily extended in future to distinguish between CPU-intensive and > IO-intensive workloads. In some cases, TMs may have IO bottlenecks but > others have CPU bottlenecks. > > Regards, > Xiangyu > > > Yuepeng Pan 于2023年10月5日周四 18:34写道: > > > Hi, Zhu Zhu, > > > > Thanks for your feedback! > > > > > I think we can introduce a new config option > > > `taskmanager.load-balance.mode`, > > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > > > can be superseded by the "Slots" mode and get deprecated. In the future > > > it can support more mode, e.g. "CpuCores", to work better for jobs with > > > fine-grained resources. The proposed config option > > > `slot.request.max-interval` > > > then can be renamed to > > `taskmanager.load-balance.request-stablizing-timeout` > > > to show its relation with the feature. The proposed > > `slot.sharing-strategy` > > > is not needed, because the configured "Tasks" mode will do the work. > > > > The new proposed configuration option sounds good to me. > > > > I have a small question, If we set our configuration value to 'Tasks,' it > > will initiate two processes: balancing the allocation of task quantities > at > > the slot level and balancing the number of tasks across TaskManagers > (TMs). > > Alternatively, if we configure it as 'Slots,' the system will employ the > > LocalPreferred allocation policy (which is the default) when assigning > > tasks to slots, and it will ensure that the number of slots used across > TMs > > is balanced. > > Does this configuration essentially combine a balanced selection > strategy > > across two dimensions into fixed configuration items, right? > > > > I would appreciate it if you could correct me if I've made any errors. > > > > Best, > > Yuepeng. > > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks Yuepeng and Rui for driving this Discussion. Internally when we try to use Flink 1.17.1 in production, we are also suffering from the unbalanced task distribution problem for jobs with high qps and complex dag. So +1 for the overall proposal. Some questions about the details: 1, About the waiting mechanism: Will the waiting mechanism happen only in the second level 'assigning slots to TM'? IIUC, the first level 'assigning Tasks to Slots' needs only the asynchronous slot result from slotpool. 2, About the slot LoadingWeight: it is reasonable to use the number of tasks by default in the beginning, but it would be better if this could be easily extended in future to distinguish between CPU-intensive and IO-intensive workloads. In some cases, TMs may have IO bottlenecks but others have CPU bottlenecks. Regards, Xiangyu Yuepeng Pan 于2023年10月5日周四 18:34写道: > Hi, Zhu Zhu, > > Thanks for your feedback! > > > I think we can introduce a new config option > > `taskmanager.load-balance.mode`, > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > > can be superseded by the "Slots" mode and get deprecated. In the future > > it can support more mode, e.g. "CpuCores", to work better for jobs with > > fine-grained resources. The proposed config option > > `slot.request.max-interval` > > then can be renamed to > `taskmanager.load-balance.request-stablizing-timeout` > > to show its relation with the feature. The proposed > `slot.sharing-strategy` > > is not needed, because the configured "Tasks" mode will do the work. > > The new proposed configuration option sounds good to me. > > I have a small question, If we set our configuration value to 'Tasks,' it > will initiate two processes: balancing the allocation of task quantities at > the slot level and balancing the number of tasks across TaskManagers (TMs). > Alternatively, if we configure it as 'Slots,' the system will employ the > LocalPreferred allocation policy (which is the default) when assigning > tasks to slots, and it will ensure that the number of slots used across TMs > is balanced. > Does this configuration essentially combine a balanced selection strategy > across two dimensions into fixed configuration items, right? > > I would appreciate it if you could correct me if I've made any errors. > > Best, > Yuepeng. >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, Zhu Zhu, Thanks for your feedback! > I think we can introduce a new config option > `taskmanager.load-balance.mode`, > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > can be superseded by the "Slots" mode and get deprecated. In the future > it can support more mode, e.g. "CpuCores", to work better for jobs with > fine-grained resources. The proposed config option > `slot.request.max-interval` > then can be renamed to `taskmanager.load-balance.request-stablizing-timeout` > to show its relation with the feature. The proposed `slot.sharing-strategy` > is not needed, because the configured "Tasks" mode will do the work. The new proposed configuration option sounds good to me. I have a small question, If we set our configuration value to 'Tasks,' it will initiate two processes: balancing the allocation of task quantities at the slot level and balancing the number of tasks across TaskManagers (TMs). Alternatively, if we configure it as 'Slots,' the system will employ the LocalPreferred allocation policy (which is the default) when assigning tasks to slots, and it will ensure that the number of slots used across TMs is balanced. Does this configuration essentially combine a balanced selection strategy across two dimensions into fixed configuration items, right? I would appreciate it if you could correct me if I've made any errors. Best, Yuepeng.
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hello Yuepeng, The FLIP reads sane; nice work! To re-phrase my understanding: The problem you're trying to solve only exists in complex graphs with different per-vertex parallelism. If the parallelism is set globally (assuming the pipeline has roughly even data skew), the algorithm could make things slightly worse by eliminating some local exchanges. Is that correct? Where I'm headed with this is that there could be a hybrid strategy that provides a reasonable default when the pipeline uses slot-sharing (for per-vertex parallelism, use the new strategy; for global parallelism use the old one). It's always a shame if improvements like this end up being a power-user feature and very few workloads benefit from it. Any thoughts? Best, D. On Sun, Oct 1, 2023 at 1:38 PM Yangze Guo wrote: > Hi, Rui, > > 1. With the current mechanism, when physical slots are offered from > TM, the JobMaster will start deploying tasks and synchronizing their > states. With the addition of the waiting mechanism, IIUC, the > JobMaster will deploy and synchronize the states of all tasks only > after all resources are available. The task deployment and state > synchronization both occupy the JobMaster's RPC main thread. In > complex jobs with a lot of tasks, this waiting mechanism may increase > the pressure on the JobMaster and increase the end-to-end job > deployment time. > > 2. From my understanding, if user enable the > cluster.evenly-spread-out-slots, > LeastUtilizationResourceMatchingStrategy will be used to determine the > slot distribution and the slot allocation in the three TM will be > (taskmanager.numberOfTaskSlots=3): > TM1: 3 slot > TM2: 2 slot > TM3: 2 slot > > Best, > Yangze Guo > > On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > Hi Shammon, > > > > Thanks for your feedback as well! > > > > > IIUC, the overall balance is divided into two parts: slot to TM and > task > > to slot. > > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > > > 2. Task to slot is guaranteed by the slot pool in JM > > > > > > These two are completely independent, what are the benefits of unifying > > > these two into one option? Also, do we want to share the same > > > option between SlotPool in JM and SlotManager in RM? This sounds a bit > > > strange. > > > > Your understanding is totally right, the balance needs 2 parts: slot to > TM > > and task to slot. > > > > As I understand, the following are benefits of unifying them into one > > option: > > > > - Flink users don't care about these principles inside of flink, they > don't > > know these 2 parts. > > - If flink provides 2 options, flink users need to set 2 options for > their > > job. > > - If one option is missed, the final result may not be good. (Users may > > have questions when using) > > - If flink just provides 1 option, enabling one option is enough. (Reduce > > the probability of misconfiguration) > > > > Also, Flink’s options are user-oriented. Each option represents a switch > or > > parameter of a feature. > > A feature may be composed of multiple components inside Flink. > > It might be better to keep only one switch per feature. > > > > Actually, the cluster.evenly-spread-out-slots option is used between > > SlotPool in JM and SlotManager in RM. 2 components to ensure > > this feature works well. > > > > Please correct me if my understanding is wrong, > > and looking forward to your feedback, thanks! > > > > Best, > > Rui > > > > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > > Hi Yangze, > > > > > > Thanks for your feedback! > > > > > > > 1. Is it possible for the SlotPool to get the slot allocation results > > > > from the SlotManager in advance instead of waiting for the actual > > > > physical slots to be registered, and perform pre-allocation? The > > > > benefit of doing this is to make the task deployment process > smoother, > > > > especially when there are a large number of tasks in the job. > > > > > > Could you elaborate on that? I didn't understand what's the benefit and > > > smoother. > > > > > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in > > > > example 2 of section 2.2.3 can be resolved. Do I understand it > > > > correctly? > > > > > > The example assigned result is the final allocation result when flink > > > user enables the cluster.evenly-spread-out-slots. We think the > > > assigned result is expected, so I think your understanding is right. > > > > > > Best, > > > Rui > > > > > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY wrote: > > > > > >> Thanks Yuepeng for initiating this discussion. > > >> > > >> +1 in general too, in fact we have implemented a similar mechanism > > >> internally to ensure a balanced allocation of tasks to slots, it works > > >> well. > > >> > > >> Some comments about the mechanism > > >> > > >> 1. This mechanism will be only supported in `SlotPool` or both > `SlotPool` > > >> and `DeclarativeSlotPool`? Currently the two
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi, Rui, 1. With the current mechanism, when physical slots are offered from TM, the JobMaster will start deploying tasks and synchronizing their states. With the addition of the waiting mechanism, IIUC, the JobMaster will deploy and synchronize the states of all tasks only after all resources are available. The task deployment and state synchronization both occupy the JobMaster's RPC main thread. In complex jobs with a lot of tasks, this waiting mechanism may increase the pressure on the JobMaster and increase the end-to-end job deployment time. 2. From my understanding, if user enable the cluster.evenly-spread-out-slots, LeastUtilizationResourceMatchingStrategy will be used to determine the slot distribution and the slot allocation in the three TM will be (taskmanager.numberOfTaskSlots=3): TM1: 3 slot TM2: 2 slot TM3: 2 slot Best, Yangze Guo On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi Shammon, > > Thanks for your feedback as well! > > > IIUC, the overall balance is divided into two parts: slot to TM and task > to slot. > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > > 2. Task to slot is guaranteed by the slot pool in JM > > > > These two are completely independent, what are the benefits of unifying > > these two into one option? Also, do we want to share the same > > option between SlotPool in JM and SlotManager in RM? This sounds a bit > > strange. > > Your understanding is totally right, the balance needs 2 parts: slot to TM > and task to slot. > > As I understand, the following are benefits of unifying them into one > option: > > - Flink users don't care about these principles inside of flink, they don't > know these 2 parts. > - If flink provides 2 options, flink users need to set 2 options for their > job. > - If one option is missed, the final result may not be good. (Users may > have questions when using) > - If flink just provides 1 option, enabling one option is enough. (Reduce > the probability of misconfiguration) > > Also, Flink’s options are user-oriented. Each option represents a switch or > parameter of a feature. > A feature may be composed of multiple components inside Flink. > It might be better to keep only one switch per feature. > > Actually, the cluster.evenly-spread-out-slots option is used between > SlotPool in JM and SlotManager in RM. 2 components to ensure > this feature works well. > > Please correct me if my understanding is wrong, > and looking forward to your feedback, thanks! > > Best, > Rui > > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > > Hi Yangze, > > > > Thanks for your feedback! > > > > > 1. Is it possible for the SlotPool to get the slot allocation results > > > from the SlotManager in advance instead of waiting for the actual > > > physical slots to be registered, and perform pre-allocation? The > > > benefit of doing this is to make the task deployment process smoother, > > > especially when there are a large number of tasks in the job. > > > > Could you elaborate on that? I didn't understand what's the benefit and > > smoother. > > > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in > > > example 2 of section 2.2.3 can be resolved. Do I understand it > > > correctly? > > > > The example assigned result is the final allocation result when flink > > user enables the cluster.evenly-spread-out-slots. We think the > > assigned result is expected, so I think your understanding is right. > > > > Best, > > Rui > > > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY wrote: > > > >> Thanks Yuepeng for initiating this discussion. > >> > >> +1 in general too, in fact we have implemented a similar mechanism > >> internally to ensure a balanced allocation of tasks to slots, it works > >> well. > >> > >> Some comments about the mechanism > >> > >> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > >> and `DeclarativeSlotPool`? Currently the two slot pools are used in > >> different schedulers. I think this will also bring value to > >> `DeclarativeSlotPool`, but currently FLIP content seems to be based on > >> `SlotPool`, right? > >> > >> 2. In fine-grained resource management, we can set different resource > >> requirements for different nodes, which means that the resources of each > >> slot are different. What should be done when the slot selected by the > >> round-robin strategy cannot meet the resource requirements? Will this lead > >> to the failure of the balance strategy? > >> > >> 3. Is the assignment of tasks to slots balanced based on region or job > >> level? When multiple TMs fail over, will it cause the balancing strategy > >> to > >> fail or even worse? What is the current processing strategy? > >> > >> For Zhuzhu and Rui: > >> > >> IIUC, the overall balance is divided into two parts: slot to TM and task > >> to > >> slot. > >> 1. Slot to TM is guaranteed by SlotManager in ResourceManager > >> 2. Task to slot is guaranteed by the slot
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Shammon, Thanks for your feedback as well! > IIUC, the overall balance is divided into two parts: slot to TM and task to slot. > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > 2. Task to slot is guaranteed by the slot pool in JM > > These two are completely independent, what are the benefits of unifying > these two into one option? Also, do we want to share the same > option between SlotPool in JM and SlotManager in RM? This sounds a bit > strange. Your understanding is totally right, the balance needs 2 parts: slot to TM and task to slot. As I understand, the following are benefits of unifying them into one option: - Flink users don't care about these principles inside of flink, they don't know these 2 parts. - If flink provides 2 options, flink users need to set 2 options for their job. - If one option is missed, the final result may not be good. (Users may have questions when using) - If flink just provides 1 option, enabling one option is enough. (Reduce the probability of misconfiguration) Also, Flink’s options are user-oriented. Each option represents a switch or parameter of a feature. A feature may be composed of multiple components inside Flink. It might be better to keep only one switch per feature. Actually, the cluster.evenly-spread-out-slots option is used between SlotPool in JM and SlotManager in RM. 2 components to ensure this feature works well. Please correct me if my understanding is wrong, and looking forward to your feedback, thanks! Best, Rui On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: > Hi Yangze, > > Thanks for your feedback! > > > 1. Is it possible for the SlotPool to get the slot allocation results > > from the SlotManager in advance instead of waiting for the actual > > physical slots to be registered, and perform pre-allocation? The > > benefit of doing this is to make the task deployment process smoother, > > especially when there are a large number of tasks in the job. > > Could you elaborate on that? I didn't understand what's the benefit and > smoother. > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in > > example 2 of section 2.2.3 can be resolved. Do I understand it > > correctly? > > The example assigned result is the final allocation result when flink > user enables the cluster.evenly-spread-out-slots. We think the > assigned result is expected, so I think your understanding is right. > > Best, > Rui > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY wrote: > >> Thanks Yuepeng for initiating this discussion. >> >> +1 in general too, in fact we have implemented a similar mechanism >> internally to ensure a balanced allocation of tasks to slots, it works >> well. >> >> Some comments about the mechanism >> >> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` >> and `DeclarativeSlotPool`? Currently the two slot pools are used in >> different schedulers. I think this will also bring value to >> `DeclarativeSlotPool`, but currently FLIP content seems to be based on >> `SlotPool`, right? >> >> 2. In fine-grained resource management, we can set different resource >> requirements for different nodes, which means that the resources of each >> slot are different. What should be done when the slot selected by the >> round-robin strategy cannot meet the resource requirements? Will this lead >> to the failure of the balance strategy? >> >> 3. Is the assignment of tasks to slots balanced based on region or job >> level? When multiple TMs fail over, will it cause the balancing strategy >> to >> fail or even worse? What is the current processing strategy? >> >> For Zhuzhu and Rui: >> >> IIUC, the overall balance is divided into two parts: slot to TM and task >> to >> slot. >> 1. Slot to TM is guaranteed by SlotManager in ResourceManager >> 2. Task to slot is guaranteed by the slot pool in JM >> >> These two are completely independent, what are the benefits of unifying >> these two into one option? Also, do we want to share the same >> option between SlotPool in JM and SlotManager in RM? This sounds a bit >> strange. >> >> Best, >> Shammon FY >> >> >> >> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote: >> >> > Hi Zhu Zhu, >> > >> > Thanks for your feedback here! >> > >> > You are right, user needs to set 2 options: >> > - cluster.evenly-spread-out-slots=true >> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED >> > >> > Update it to one option is useful at user side, so >> > `taskmanager.load-balance.mode` sounds good to me. >> > I want to check some points and behaviors about this option: >> > >> > 1. The default value is None, right? >> > 2. When it's set to Tasks, how to assign slots to TM? >> > - Option1: It's just check task number >> > - Option2: It''s check the slot number first, then check the >> > task number when the slot number is the same. >> > >> > Giving an example to explain what's the difference between them: >> > >> > - A session cluster has 2 flink
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Yangze, Thanks for your feedback! > 1. Is it possible for the SlotPool to get the slot allocation results > from the SlotManager in advance instead of waiting for the actual > physical slots to be registered, and perform pre-allocation? The > benefit of doing this is to make the task deployment process smoother, > especially when there are a large number of tasks in the job. Could you elaborate on that? I didn't understand what's the benefit and smoother. > 2. If user enable the cluster.evenly-spread-out-slots, the issue in > example 2 of section 2.2.3 can be resolved. Do I understand it > correctly? The example assigned result is the final allocation result when flink user enables the cluster.evenly-spread-out-slots. We think the assigned result is expected, so I think your understanding is right. Best, Rui On Thu, Sep 28, 2023 at 1:10 PM Shammon FY wrote: > Thanks Yuepeng for initiating this discussion. > > +1 in general too, in fact we have implemented a similar mechanism > internally to ensure a balanced allocation of tasks to slots, it works > well. > > Some comments about the mechanism > > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` > and `DeclarativeSlotPool`? Currently the two slot pools are used in > different schedulers. I think this will also bring value to > `DeclarativeSlotPool`, but currently FLIP content seems to be based on > `SlotPool`, right? > > 2. In fine-grained resource management, we can set different resource > requirements for different nodes, which means that the resources of each > slot are different. What should be done when the slot selected by the > round-robin strategy cannot meet the resource requirements? Will this lead > to the failure of the balance strategy? > > 3. Is the assignment of tasks to slots balanced based on region or job > level? When multiple TMs fail over, will it cause the balancing strategy to > fail or even worse? What is the current processing strategy? > > For Zhuzhu and Rui: > > IIUC, the overall balance is divided into two parts: slot to TM and task to > slot. > 1. Slot to TM is guaranteed by SlotManager in ResourceManager > 2. Task to slot is guaranteed by the slot pool in JM > > These two are completely independent, what are the benefits of unifying > these two into one option? Also, do we want to share the same > option between SlotPool in JM and SlotManager in RM? This sounds a bit > strange. > > Best, > Shammon FY > > > > On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote: > > > Hi Zhu Zhu, > > > > Thanks for your feedback here! > > > > You are right, user needs to set 2 options: > > - cluster.evenly-spread-out-slots=true > > - slot.sharing-strategy=TASK_BALANCED_PREFERRED > > > > Update it to one option is useful at user side, so > > `taskmanager.load-balance.mode` sounds good to me. > > I want to check some points and behaviors about this option: > > > > 1. The default value is None, right? > > 2. When it's set to Tasks, how to assign slots to TM? > > - Option1: It's just check task number > > - Option2: It''s check the slot number first, then check the > > task number when the slot number is the same. > > > > Giving an example to explain what's the difference between them: > > > > - A session cluster has 2 flink jobs, they are jobA and jobB > > - Each TM has 4 slots. > > - The task number of one slot of jobA is 3 > > - The task number of one slot of jobB is 1 > > - We have 2 TaskManagers: > > - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks > > - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks. > > > > Now, we need to run a new slot, which tm should offer it? > > - Option1: If we just check the task number, the tm1 is better. > > - Option2: If we check the slot number first, and then check task, the > tm2 > > is better > > > > The original FLIP selected option2, that's why we didn't add the > > third option. The option2 didn't break the semantics when > > `cluster.evenly-spread-out-slots` is true, and it just improve the > > behavior without the semantics is changed. > > > > In the other hands, if we choose option2, when user set > > `taskmanager.load-balance.mode` is Tasks. It also can achieve > > the goal when it's Slots. > > > > So I think the `Slots` enum isn't needed if we choose option2. > > Of course, If we choose the option1, the enum is needed. > > > > Looking forward to your feedback, thanks~ > > > > Best, > > Rui > > > > On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu wrote: > > > > > Thanks Yuepeng and Rui for creating this FLIP. > > > > > > +1 in general > > > The idea is straight forward: best-effort gather all the slot requests > > > and offered slots to form an overview before assigning slots, trying to > > > balance the loads of task managers when assigning slots. > > > > > > I have one comment regarding the configuration for ease of use: > > > > > > IIUC, this FLIP uses an existing config > 'cluster.evenly-spread-out-slots' > > > as the main switch of
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks Yuepeng for initiating this discussion. +1 in general too, in fact we have implemented a similar mechanism internally to ensure a balanced allocation of tasks to slots, it works well. Some comments about the mechanism 1. This mechanism will be only supported in `SlotPool` or both `SlotPool` and `DeclarativeSlotPool`? Currently the two slot pools are used in different schedulers. I think this will also bring value to `DeclarativeSlotPool`, but currently FLIP content seems to be based on `SlotPool`, right? 2. In fine-grained resource management, we can set different resource requirements for different nodes, which means that the resources of each slot are different. What should be done when the slot selected by the round-robin strategy cannot meet the resource requirements? Will this lead to the failure of the balance strategy? 3. Is the assignment of tasks to slots balanced based on region or job level? When multiple TMs fail over, will it cause the balancing strategy to fail or even worse? What is the current processing strategy? For Zhuzhu and Rui: IIUC, the overall balance is divided into two parts: slot to TM and task to slot. 1. Slot to TM is guaranteed by SlotManager in ResourceManager 2. Task to slot is guaranteed by the slot pool in JM These two are completely independent, what are the benefits of unifying these two into one option? Also, do we want to share the same option between SlotPool in JM and SlotManager in RM? This sounds a bit strange. Best, Shammon FY On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote: > Hi Zhu Zhu, > > Thanks for your feedback here! > > You are right, user needs to set 2 options: > - cluster.evenly-spread-out-slots=true > - slot.sharing-strategy=TASK_BALANCED_PREFERRED > > Update it to one option is useful at user side, so > `taskmanager.load-balance.mode` sounds good to me. > I want to check some points and behaviors about this option: > > 1. The default value is None, right? > 2. When it's set to Tasks, how to assign slots to TM? > - Option1: It's just check task number > - Option2: It''s check the slot number first, then check the > task number when the slot number is the same. > > Giving an example to explain what's the difference between them: > > - A session cluster has 2 flink jobs, they are jobA and jobB > - Each TM has 4 slots. > - The task number of one slot of jobA is 3 > - The task number of one slot of jobB is 1 > - We have 2 TaskManagers: > - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks > - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks. > > Now, we need to run a new slot, which tm should offer it? > - Option1: If we just check the task number, the tm1 is better. > - Option2: If we check the slot number first, and then check task, the tm2 > is better > > The original FLIP selected option2, that's why we didn't add the > third option. The option2 didn't break the semantics when > `cluster.evenly-spread-out-slots` is true, and it just improve the > behavior without the semantics is changed. > > In the other hands, if we choose option2, when user set > `taskmanager.load-balance.mode` is Tasks. It also can achieve > the goal when it's Slots. > > So I think the `Slots` enum isn't needed if we choose option2. > Of course, If we choose the option1, the enum is needed. > > Looking forward to your feedback, thanks~ > > Best, > Rui > > On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu wrote: > > > Thanks Yuepeng and Rui for creating this FLIP. > > > > +1 in general > > The idea is straight forward: best-effort gather all the slot requests > > and offered slots to form an overview before assigning slots, trying to > > balance the loads of task managers when assigning slots. > > > > I have one comment regarding the configuration for ease of use: > > > > IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots' > > as the main switch of the new feature. That is, from user perspective, > > with this improvement, the 'cluster.evenly-spread-out-slots' feature not > > only balances the number of slots on task managers, but also balances the > > number of tasks. This is a behavior change anyway. Besides that, it also > > requires users to set 'slot.sharing-strategy' to > 'TASK_BALANCED_PREFERRED' > > to balance the tasks in each slot. > > > > I think we can introduce a new config option > > `taskmanager.load-balance.mode`, > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > > can be superseded by the "Slots" mode and get deprecated. In the future > > it can support more mode, e.g. "CpuCores", to work better for jobs with > > fine-grained resources. The proposed config option > > `slot.request.max-interval` > > then can be renamed to > > `taskmanager.load-balance.request-stablizing-timeout` > > to show its relation with the feature. The proposed > `slot.sharing-strategy` > > is not needed, because the configured "Tasks" mode will do the work. > > > > WDYT? > > > > Thanks, > > Zhu
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Hi Zhu Zhu, Thanks for your feedback here! You are right, user needs to set 2 options: - cluster.evenly-spread-out-slots=true - slot.sharing-strategy=TASK_BALANCED_PREFERRED Update it to one option is useful at user side, so `taskmanager.load-balance.mode` sounds good to me. I want to check some points and behaviors about this option: 1. The default value is None, right? 2. When it's set to Tasks, how to assign slots to TM? - Option1: It's just check task number - Option2: It''s check the slot number first, then check the task number when the slot number is the same. Giving an example to explain what's the difference between them: - A session cluster has 2 flink jobs, they are jobA and jobB - Each TM has 4 slots. - The task number of one slot of jobA is 3 - The task number of one slot of jobB is 1 - We have 2 TaskManagers: - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks. Now, we need to run a new slot, which tm should offer it? - Option1: If we just check the task number, the tm1 is better. - Option2: If we check the slot number first, and then check task, the tm2 is better The original FLIP selected option2, that's why we didn't add the third option. The option2 didn't break the semantics when `cluster.evenly-spread-out-slots` is true, and it just improve the behavior without the semantics is changed. In the other hands, if we choose option2, when user set `taskmanager.load-balance.mode` is Tasks. It also can achieve the goal when it's Slots. So I think the `Slots` enum isn't needed if we choose option2. Of course, If we choose the option1, the enum is needed. Looking forward to your feedback, thanks~ Best, Rui On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu wrote: > Thanks Yuepeng and Rui for creating this FLIP. > > +1 in general > The idea is straight forward: best-effort gather all the slot requests > and offered slots to form an overview before assigning slots, trying to > balance the loads of task managers when assigning slots. > > I have one comment regarding the configuration for ease of use: > > IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots' > as the main switch of the new feature. That is, from user perspective, > with this improvement, the 'cluster.evenly-spread-out-slots' feature not > only balances the number of slots on task managers, but also balances the > number of tasks. This is a behavior change anyway. Besides that, it also > requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED' > to balance the tasks in each slot. > > I think we can introduce a new config option > `taskmanager.load-balance.mode`, > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > can be superseded by the "Slots" mode and get deprecated. In the future > it can support more mode, e.g. "CpuCores", to work better for jobs with > fine-grained resources. The proposed config option > `slot.request.max-interval` > then can be renamed to > `taskmanager.load-balance.request-stablizing-timeout` > to show its relation with the feature. The proposed `slot.sharing-strategy` > is not needed, because the configured "Tasks" mode will do the work. > > WDYT? > > Thanks, > Zhu Zhu > > Yuepeng Pan 于2023年9月25日周一 16:26写道: > >> Hi all, >> >> >> I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks >> scheduling. >> >> >> The current strategy of Flink to deploy tasks sometimes leads some >> TMs(TaskManagers) to have more tasks while others have fewer tasks, >> resulting in excessive resource utilization at some TMs that contain more >> tasks and becoming a bottleneck for the entire job processing. Developing >> strategies to achieve task load balancing for TMs and reducing job >> bottlenecks becomes very meaningful. >> >> >> The raw design and discussions could be found in the Flink JIRA[2] and >> Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some >> valuable help and suggestions in advance. >> >> >> Please refer to the FLIP[1] document for more details about the proposed >> design and implementation. We welcome any feedback and opinions on this >> proposal. >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling >> >> [2] https://issues.apache.org/jira/browse/FLINK-31757 >> >> [3] >> https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8 >> >> >> Best, >> >> Yuepeng Pan >> >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks for driving this FLIP, Yuepeng Pan. +1 for the overall proposal to support balanced scheduling. Some questions on the Waiting mechanism and Allocation strategy for slot to TM: 1. Is it possible for the SlotPool to get the slot allocation results from the SlotManager in advance instead of waiting for the actual physical slots to be registered, and perform pre-allocation? The benefit of doing this is to make the task deployment process smoother, especially when there are a large number of tasks in the job. 2. If user enable the cluster.evenly-spread-out-slots, the issue in example 2 of section 2.2.3 can be resolved. Do I understand it correctly? Best, Yangze Guo On Wed, Sep 27, 2023 at 9:12 PM Zhu Zhu wrote: > > Thanks Yuepeng and Rui for creating this FLIP. > > +1 in general > The idea is straight forward: best-effort gather all the slot requests > and offered slots to form an overview before assigning slots, trying to > balance the loads of task managers when assigning slots. > > I have one comment regarding the configuration for ease of use: > > IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots' > as the main switch of the new feature. That is, from user perspective, > with this improvement, the 'cluster.evenly-spread-out-slots' feature not > only balances the number of slots on task managers, but also balances the > number of tasks. This is a behavior change anyway. Besides that, it also > requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED' > to balance the tasks in each slot. > > I think we can introduce a new config option > `taskmanager.load-balance.mode`, > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` > can be superseded by the "Slots" mode and get deprecated. In the future > it can support more mode, e.g. "CpuCores", to work better for jobs with > fine-grained resources. The proposed config option > `slot.request.max-interval` > then can be renamed to `taskmanager.load-balance.request-stablizing-timeout` > to show its relation with the feature. The proposed `slot.sharing-strategy` > is not needed, because the configured "Tasks" mode will do the work. > > WDYT? > > Thanks, > Zhu Zhu > > Yuepeng Pan 于2023年9月25日周一 16:26写道: > > > Hi all, > > > > > > I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks > > scheduling. > > > > > > The current strategy of Flink to deploy tasks sometimes leads some > > TMs(TaskManagers) to have more tasks while others have fewer tasks, > > resulting in excessive resource utilization at some TMs that contain more > > tasks and becoming a bottleneck for the entire job processing. Developing > > strategies to achieve task load balancing for TMs and reducing job > > bottlenecks becomes very meaningful. > > > > > > The raw design and discussions could be found in the Flink JIRA[2] and > > Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some > > valuable help and suggestions in advance. > > > > > > Please refer to the FLIP[1] document for more details about the proposed > > design and implementation. We welcome any feedback and opinions on this > > proposal. > > > > > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling > > > > [2] https://issues.apache.org/jira/browse/FLINK-31757 > > > > [3] > > https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8 > > > > > > Best, > > > > Yuepeng Pan > >
Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling
Thanks Yuepeng and Rui for creating this FLIP. +1 in general The idea is straight forward: best-effort gather all the slot requests and offered slots to form an overview before assigning slots, trying to balance the loads of task managers when assigning slots. I have one comment regarding the configuration for ease of use: IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots' as the main switch of the new feature. That is, from user perspective, with this improvement, the 'cluster.evenly-spread-out-slots' feature not only balances the number of slots on task managers, but also balances the number of tasks. This is a behavior change anyway. Besides that, it also requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED' to balance the tasks in each slot. I think we can introduce a new config option `taskmanager.load-balance.mode`, which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots` can be superseded by the "Slots" mode and get deprecated. In the future it can support more mode, e.g. "CpuCores", to work better for jobs with fine-grained resources. The proposed config option `slot.request.max-interval` then can be renamed to `taskmanager.load-balance.request-stablizing-timeout` to show its relation with the feature. The proposed `slot.sharing-strategy` is not needed, because the configured "Tasks" mode will do the work. WDYT? Thanks, Zhu Zhu Yuepeng Pan 于2023年9月25日周一 16:26写道: > Hi all, > > > I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks > scheduling. > > > The current strategy of Flink to deploy tasks sometimes leads some > TMs(TaskManagers) to have more tasks while others have fewer tasks, > resulting in excessive resource utilization at some TMs that contain more > tasks and becoming a bottleneck for the entire job processing. Developing > strategies to achieve task load balancing for TMs and reducing job > bottlenecks becomes very meaningful. > > > The raw design and discussions could be found in the Flink JIRA[2] and > Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some > valuable help and suggestions in advance. > > > Please refer to the FLIP[1] document for more details about the proposed > design and implementation. We welcome any feedback and opinions on this > proposal. > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling > > [2] https://issues.apache.org/jira/browse/FLINK-31757 > > [3] > https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8 > > > Best, > > Yuepeng Pan >