Re:[DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-19 Thread Yuepeng Pan
Hi, dev.




After reviewing the entire email discussion thread with Rui, I noticed that my 
previous ambiguous understanding led to a few incorrect conclusions. 

So I need to change the corresponding conclusions. And Thanks for the help from 
Rui.




>For David: 

>The problem you're trying to solve only exists in complex graphs with

>different per-vertex parallelism. If the parallelism is set globally

>(assuming the pipeline has roughly even data skew), the algorithm could

>make things slightly worse by eliminating some local exchanges. Is that

>correct?

I re-checked that if all parallelisms of all nodes are equal, the new strategy 
will not disrupt local exchanges, all subtasks with forward shuffle are still 
in the same Slot.

As described in the 2.1.1 core logic of FLIP-370[1],  If all parallelisms of 
all nodes are equal, The new strategy would traverse all SEVs of JV, assign the 
SEVs[subtask_index] to the ESSGs[subtask_index]. As the result of the new 
strategy:

a.  This strategy ensures that SEVs with the same index can be assigned to the 
same ESSG. 

b. In the case of forward edges, all subtasks with forward shuffle are still in 
the same Slot, and they are local data exchanges.  



--




If there are no additional comments about the FLIP, I’d  plan to initiate a 
vote about the FLIP next Monday.




Best Regards,

Yuepeng




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling




At 2023-09-25 16:25:03, "Yuepeng Pan"  wrote:
>Hi all,
>
>
>
>
>I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks 
>scheduling.
>
>
>
>
>The current strategy of Flink to deploy tasks sometimes leads some 
>TMs(TaskManagers) to have more tasks while others have fewer tasks, resulting 
>in excessive resource utilization at some TMs that contain more tasks and 
>becoming a bottleneck for the entire job processing. Developing strategies to 
>achieve task load balancing for TMs and reducing job bottlenecks becomes very 
>meaningful.
>
>
>
>
>The raw design and discussions could be found in the Flink JIRA[2] and Google 
>doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some valuable help 
>and suggestions in advance. 
>
>
>
>
>Please refer to the FLIP[1] document for more details about the proposed 
>design and implementation. We welcome any feedback and opinions on this 
>proposal.
>
>
>
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
>
>[2] https://issues.apache.org/jira/browse/FLINK-31757
>
>[3] 
>https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8
>
>
>
>
>Best,
>
>Yuepeng Pan


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-17 Thread Yuepeng Pan
Hi, Rui.

Thank you for the update.
+1 for the updated edition of the FLIP page.
And thanks Zhu Zhu, Yangze Guo for the discussion. 

Best Regards.
Yuepeng Pan

On 2023/10/17 03:45:08 Rui Fan wrote:
> Hi all,
> 
> Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan.
> We reached consensus on slot.request.max-interval and
> taskmanager.load-balance.mode. And I have updated the FLIP.
> 
> For a detailed introduction to taskmanager.load-balance.mode,
> please refer to FLIP’s 3.1 Public Interfaces[1].
> 
> And the strategy for slot.request.max-intervel has been improved.
> The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2].
> For comparison of old and new strategies, please refer to
> RejectedAlternatives[3].
> 
> Thanks again to everyone who participated in the discussion.
> Looking forward to your continued feedback.
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives
> 
> Best,
> Rui
> 
> On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan  wrote:
> 
> > Hi, Shammon.
> > Thanks for your feedback.
> >
> > >1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> > and `DeclarativeSlotPool`?
> >
> > As described on the FLIP page, the current design plans to introduce the
> > waiting mechanism only in the `SlotPool`, because the existing
> > `WaitingForResources` can already achieve this effect.
> >
> > >Currently the two slot pools are used in different schedulers.
> >
> > Yes, that's indeed the case.
> >
> > >I think this will also bring value to `DeclarativeSlotPool`, but
> > currently FLIP content seems to be based on `SlotPool`, right?
> >
> > Yes. your expectations are indeed reasonable. In theory, the
> > `DeclarativeSlotPool` could also benefit from a waiting mechanism, as
> > discussed. The purpose of introducing the waiting mechanism is to enable
> > the `SlotPool` to have a global view to calculate the globally optimal
> > solution. I've rechecked the relevant logic in the `AdaptiveScheduler`, and
> > as I understand, the existing mechanisms already fulfill the current
> > feature requirements. You could find more conclusions on this in FLIP
> > `3.2.5`. Of course, I'd be appreciated with your confirmation. If there's
> > any misunderstanding on my part, please correct me.
> >
> > >2. ... What should be done when the slot selected by the round-robin
> > strategy cannot meet the resource requirements?
> >
> > Is this referring to the phase of task-to-slot allocation? I'm not quite
> > sure, would you mind explaining it? Thanks~.
> >
> > >3. Is the assignment of tasks to slots balanced based on region or job
> > level?
> >
> > Currently, there is no specific handling based on regions, and there is no
> > job-level balancing. The target effect of the current feature is to achieve
> > load balancing based on the number of tasks at the Task Manager (TM) level.
> > Looking forward to any suggestions regarding the item you mentioned.
> >
> > >When multiple TMs fail over, will it cause the balancing strategy to fail
> > or even worse?
> >
> > IIUC, when multiple Task Managers undergo failover, the results after
> > successful recovery will still be maintained in a relatively balanced state.
> >
> > >What is the current processing strategy?
> >
> > The Slot-to-TM strategy does not change after a Task Manager undergoes
> > failover.
> >
> > Best, Regards.
> > Yuepeng Pan
> >
> > On 2023/09/28 05:10:13 Shammon FY wrote:
> > > Thanks Yuepeng for initiating this discussion.
> > >
> > > +1 in general too, in fact we have implemented a similar mechanism
> > > internally to ensure a balanced allocation of tasks to slots, it works
> > well.
> > >
> > > Some comments about the mechanism
> > >
> > > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> > > and `DeclarativeSlotPool`? Currently the two slot pools are used in
> > > different schedulers. I think this will also bring value to
> > > `DeclarativeSlotPool`, but currently FLIP content seems to be based on
> > > `SlotPool`, right?
> > >
> > > 2. In fine-grained resource management, we can set different resource
> > > requirements for different nodes, which means that the resources of each
> > > slot are different. What should be done when the slot selected by the
> > > round-robin strategy cannot meet the resource requirements? Will this
> > lead
> > > to the failure of the balance strategy?
> > >
> > > 3. Is the assignment of tasks to slots balanced based on region or job
> > > level? When multiple TMs fail over, will it cause the balancing strategy
> > to
> > > 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-16 Thread Yangze Guo
Thanks for the update, Rui. +1 for the latest version of the FLIP.


Best,
Yangze Guo

On Tue, Oct 17, 2023 at 11:45 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi all,
>
> Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan.
> We reached consensus on slot.request.max-interval and
> taskmanager.load-balance.mode. And I have updated the FLIP.
>
> For a detailed introduction to taskmanager.load-balance.mode,
> please refer to FLIP’s 3.1 Public Interfaces[1].
>
> And the strategy for slot.request.max-intervel has been improved.
> The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2].
> For comparison of old and new strategies, please refer to 
> RejectedAlternatives[3].
>
> Thanks again to everyone who participated in the discussion.
> Looking forward to your continued feedback.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
> [3] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives
>
> Best,
> Rui
>
> On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan  wrote:
>>
>> Hi, Shammon.
>> Thanks for your feedback.
>>
>> >1. This mechanism will be only supported in `SlotPool` or both `SlotPool` 
>> >and `DeclarativeSlotPool`?
>>
>> As described on the FLIP page, the current design plans to introduce the 
>> waiting mechanism only in the `SlotPool`, because the existing 
>> `WaitingForResources` can already achieve this effect.
>>
>> >Currently the two slot pools are used in different schedulers.
>>
>> Yes, that's indeed the case.
>>
>> >I think this will also bring value to `DeclarativeSlotPool`, but currently 
>> >FLIP content seems to be based on `SlotPool`, right?
>>
>> Yes. your expectations are indeed reasonable. In theory, the 
>> `DeclarativeSlotPool` could also benefit from a waiting mechanism, as 
>> discussed. The purpose of introducing the waiting mechanism is to enable the 
>> `SlotPool` to have a global view to calculate the globally optimal solution. 
>> I've rechecked the relevant logic in the `AdaptiveScheduler`, and as I 
>> understand, the existing mechanisms already fulfill the current feature 
>> requirements. You could find more conclusions on this in FLIP `3.2.5`. Of 
>> course, I'd be appreciated with your confirmation. If there's any 
>> misunderstanding on my part, please correct me.
>>
>> >2. ... What should be done when the slot selected by the round-robin 
>> >strategy cannot meet the resource requirements?
>>
>> Is this referring to the phase of task-to-slot allocation? I'm not quite 
>> sure, would you mind explaining it? Thanks~.
>>
>> >3. Is the assignment of tasks to slots balanced based on region or job 
>> >level?
>>
>> Currently, there is no specific handling based on regions, and there is no 
>> job-level balancing. The target effect of the current feature is to achieve 
>> load balancing based on the number of tasks at the Task Manager (TM) level.
>> Looking forward to any suggestions regarding the item you mentioned.
>>
>> >When multiple TMs fail over, will it cause the balancing strategy to fail 
>> >or even worse?
>>
>> IIUC, when multiple Task Managers undergo failover, the results after 
>> successful recovery will still be maintained in a relatively balanced state.
>>
>> >What is the current processing strategy?
>>
>> The Slot-to-TM strategy does not change after a Task Manager undergoes 
>> failover.
>>
>> Best, Regards.
>> Yuepeng Pan
>>
>> On 2023/09/28 05:10:13 Shammon FY wrote:
>> > Thanks Yuepeng for initiating this discussion.
>> >
>> > +1 in general too, in fact we have implemented a similar mechanism
>> > internally to ensure a balanced allocation of tasks to slots, it works 
>> > well.
>> >
>> > Some comments about the mechanism
>> >
>> > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
>> > and `DeclarativeSlotPool`? Currently the two slot pools are used in
>> > different schedulers. I think this will also bring value to
>> > `DeclarativeSlotPool`, but currently FLIP content seems to be based on
>> > `SlotPool`, right?
>> >
>> > 2. In fine-grained resource management, we can set different resource
>> > requirements for different nodes, which means that the resources of each
>> > slot are different. What should be done when the slot selected by the
>> > round-robin strategy cannot meet the resource requirements? Will this lead
>> > to the failure of the balance strategy?
>> >
>> > 3. Is the assignment of tasks to slots balanced based on region or job
>> > level? When multiple TMs fail over, will it cause the balancing strategy to
>> > fail or even worse? What is the current processing strategy?
>> >
>> > For Zhuzhu and Rui:
>> >
>> > 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-16 Thread Rui Fan
Hi all,

Offline discussed with Zhu Zhu, Yangze Guo, Yuepeng Pan.
We reached consensus on slot.request.max-interval and
taskmanager.load-balance.mode. And I have updated the FLIP.

For a detailed introduction to taskmanager.load-balance.mode,
please refer to FLIP’s 3.1 Public Interfaces[1].

And the strategy for slot.request.max-intervel has been improved.
The latest strategy can be referred from FLIP’s 2.2.2 Waiting mechanism[2].
For comparison of old and new strategies, please refer to
RejectedAlternatives[3].

Thanks again to everyone who participated in the discussion.
Looking forward to your continued feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-3.1PublicInterfaces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-RejectedAlternatives

Best,
Rui

On Thu, Oct 12, 2023 at 9:49 AM Yuepeng Pan  wrote:

> Hi, Shammon.
> Thanks for your feedback.
>
> >1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> and `DeclarativeSlotPool`?
>
> As described on the FLIP page, the current design plans to introduce the
> waiting mechanism only in the `SlotPool`, because the existing
> `WaitingForResources` can already achieve this effect.
>
> >Currently the two slot pools are used in different schedulers.
>
> Yes, that's indeed the case.
>
> >I think this will also bring value to `DeclarativeSlotPool`, but
> currently FLIP content seems to be based on `SlotPool`, right?
>
> Yes. your expectations are indeed reasonable. In theory, the
> `DeclarativeSlotPool` could also benefit from a waiting mechanism, as
> discussed. The purpose of introducing the waiting mechanism is to enable
> the `SlotPool` to have a global view to calculate the globally optimal
> solution. I've rechecked the relevant logic in the `AdaptiveScheduler`, and
> as I understand, the existing mechanisms already fulfill the current
> feature requirements. You could find more conclusions on this in FLIP
> `3.2.5`. Of course, I'd be appreciated with your confirmation. If there's
> any misunderstanding on my part, please correct me.
>
> >2. ... What should be done when the slot selected by the round-robin
> strategy cannot meet the resource requirements?
>
> Is this referring to the phase of task-to-slot allocation? I'm not quite
> sure, would you mind explaining it? Thanks~.
>
> >3. Is the assignment of tasks to slots balanced based on region or job
> level?
>
> Currently, there is no specific handling based on regions, and there is no
> job-level balancing. The target effect of the current feature is to achieve
> load balancing based on the number of tasks at the Task Manager (TM) level.
> Looking forward to any suggestions regarding the item you mentioned.
>
> >When multiple TMs fail over, will it cause the balancing strategy to fail
> or even worse?
>
> IIUC, when multiple Task Managers undergo failover, the results after
> successful recovery will still be maintained in a relatively balanced state.
>
> >What is the current processing strategy?
>
> The Slot-to-TM strategy does not change after a Task Manager undergoes
> failover.
>
> Best, Regards.
> Yuepeng Pan
>
> On 2023/09/28 05:10:13 Shammon FY wrote:
> > Thanks Yuepeng for initiating this discussion.
> >
> > +1 in general too, in fact we have implemented a similar mechanism
> > internally to ensure a balanced allocation of tasks to slots, it works
> well.
> >
> > Some comments about the mechanism
> >
> > 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> > and `DeclarativeSlotPool`? Currently the two slot pools are used in
> > different schedulers. I think this will also bring value to
> > `DeclarativeSlotPool`, but currently FLIP content seems to be based on
> > `SlotPool`, right?
> >
> > 2. In fine-grained resource management, we can set different resource
> > requirements for different nodes, which means that the resources of each
> > slot are different. What should be done when the slot selected by the
> > round-robin strategy cannot meet the resource requirements? Will this
> lead
> > to the failure of the balance strategy?
> >
> > 3. Is the assignment of tasks to slots balanced based on region or job
> > level? When multiple TMs fail over, will it cause the balancing strategy
> to
> > fail or even worse? What is the current processing strategy?
> >
> > For Zhuzhu and Rui:
> >
> > IIUC, the overall balance is divided into two parts: slot to TM and task
> to
> > slot.
> > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> > 2. Task to slot is guaranteed by the slot pool in JM
> >
> > These two are completely independent, what are the benefits of unifying
> > these two into one option? 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-11 Thread Yuepeng Pan
Hi, Shammon.
Thanks for your feedback.

>1. This mechanism will be only supported in `SlotPool` or both `SlotPool` and 
>`DeclarativeSlotPool`? 

As described on the FLIP page, the current design plans to introduce the 
waiting mechanism only in the `SlotPool`, because the existing 
`WaitingForResources` can already achieve this effect.

>Currently the two slot pools are used in different schedulers. 

Yes, that's indeed the case.

>I think this will also bring value to `DeclarativeSlotPool`, but currently 
>FLIP content seems to be based on `SlotPool`, right?

Yes. your expectations are indeed reasonable. In theory, the 
`DeclarativeSlotPool` could also benefit from a waiting mechanism, as 
discussed. The purpose of introducing the waiting mechanism is to enable the 
`SlotPool` to have a global view to calculate the globally optimal solution. 
I've rechecked the relevant logic in the `AdaptiveScheduler`, and as I 
understand, the existing mechanisms already fulfill the current feature 
requirements. You could find more conclusions on this in FLIP `3.2.5`. Of 
course, I'd be appreciated with your confirmation. If there's any 
misunderstanding on my part, please correct me.

>2. ... What should be done when the slot selected by the round-robin strategy 
>cannot meet the resource requirements?

Is this referring to the phase of task-to-slot allocation? I'm not quite sure, 
would you mind explaining it? Thanks~.

>3. Is the assignment of tasks to slots balanced based on region or job level? 

Currently, there is no specific handling based on regions, and there is no 
job-level balancing. The target effect of the current feature is to achieve 
load balancing based on the number of tasks at the Task Manager (TM) level.
Looking forward to any suggestions regarding the item you mentioned.

>When multiple TMs fail over, will it cause the balancing strategy to fail or 
>even worse? 

IIUC, when multiple Task Managers undergo failover, the results after 
successful recovery will still be maintained in a relatively balanced state.

>What is the current processing strategy?

The Slot-to-TM strategy does not change after a Task Manager undergoes failover.

Best, Regards.
Yuepeng Pan

On 2023/09/28 05:10:13 Shammon FY wrote:
> Thanks Yuepeng for initiating this discussion.
> 
> +1 in general too, in fact we have implemented a similar mechanism
> internally to ensure a balanced allocation of tasks to slots, it works well.
> 
> Some comments about the mechanism
> 
> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> and `DeclarativeSlotPool`? Currently the two slot pools are used in
> different schedulers. I think this will also bring value to
> `DeclarativeSlotPool`, but currently FLIP content seems to be based on
> `SlotPool`, right?
> 
> 2. In fine-grained resource management, we can set different resource
> requirements for different nodes, which means that the resources of each
> slot are different. What should be done when the slot selected by the
> round-robin strategy cannot meet the resource requirements? Will this lead
> to the failure of the balance strategy?
> 
> 3. Is the assignment of tasks to slots balanced based on region or job
> level? When multiple TMs fail over, will it cause the balancing strategy to
> fail or even worse? What is the current processing strategy?
> 
> For Zhuzhu and Rui:
> 
> IIUC, the overall balance is divided into two parts: slot to TM and task to
> slot.
> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> 2. Task to slot is guaranteed by the slot pool in JM
> 
> These two are completely independent, what are the benefits of unifying
> these two into one option? Also, do we want to share the same
> option between SlotPool in JM and SlotManager in RM? This sounds a bit
> strange.
> 
> Best,
> Shammon FY
> 
> 
> 
> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote:
> 
> > Hi Zhu Zhu,
> >
> > Thanks for your feedback here!
> >
> > You are right, user needs to set 2 options:
> > - cluster.evenly-spread-out-slots=true
> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED
> >
> > Update it to one option is useful at user side, so
> > `taskmanager.load-balance.mode` sounds good to me.
> > I want to check some points and behaviors about this option:
> >
> > 1. The default value is None, right?
> > 2. When it's set to Tasks, how to assign slots to TM?
> > - Option1: It's just check task number
> > - Option2: It''s check the slot number first, then check the
> > task number when the slot number is the same.
> >
> > Giving an example to explain what's the difference between them:
> >
> > - A session cluster has 2 flink jobs, they are jobA and jobB
> > - Each TM has 4 slots.
> > - The task number of one slot of jobA is 3
> > - The task number of one slot of jobB is 1
> > - We have 2 TaskManagers:
> >   - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks
> >   - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks.
> >
> > Now, we 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-11 Thread xiangyu feng
Hi Yuepeng,

Thanks for your feedback. I agree with u, both approaches can achieve the
goal.
As long as we can easily extend the balancing strategy to consider more
than one factors without changing the interface, the solution is OK for me.

Regards,
Xiangyu

Yuepeng Pan  于2023年10月11日周三 17:38写道:

> Hi, xiangyu.
> Thanks for your quick reply.
>
> >interface currently only includes a description of the number of tasks.
> So,
> >IIUC, If there is a need to further expand
> >current interface and its implementations, right?
>
> Yes, that's indeed the case.
>
> >I checked the interface design of LoadingWeight and WeightLoadable, AFAIK
> >currently it only supports comparing the load for one factor. If we want
> to
> >add more loading factors, LoadingWeight might need to add a 'LoadType'
> >field for distinction, WeightLoadable might need to return
> >Set.
>
> Thank you for the clarification, I think I roughly understand your
> description:
> In fact, regarding the specific implementation and extension of this
> LoadingWeight, we can extend it based on this interface and its
> implementation as mentioned above.
> If making frequent changes to the interface and its implementation is
> really tiresome, we can also consider introducing a built-in collapsible
> Map or other type of attribute, like the SlotSharingGroup class in the
> org.apache.flink.api.common.operators package, to describe the specific
> collection of load values and types. This way, these loads are collapsed
> within the LoadingWeight's implementation and can be expanded when needed
> for use. Of course, we can also consider an implementation like the one you
> mentioned, introducing a method in WeightLoadable that returns a collection
> as the return type, so the load values are expanded at the calling site and
> then used. As I understand it, both approaches can achieve the goal.
>
> Of course, I also look forward to hearing others' suggestions. If there
> are any mistakes in my statement, please correct me.
> Looking forward to your reply.
>
> Best regards.
> Yuepeng Pan
>
> On 2023/10/11 08:44:51 xiangyu feng wrote:
> > Hi Yuepeng,
> >
> > Thx for ur reply.
> >
> > > Nice feedback. In fact, as mentioned in the Google Doc, the
> LoadingWeight
> > interface currently only includes a description of the number of tasks.
> So,
> > IIUC, If there is a need to further expand
> > > descriptions of other resource loads, we just extend it based on the
> > current interface and its implementations, right?
> >
> > I checked the interface design of LoadingWeight and WeightLoadable, AFAIK
> > currently it only supports comparing the load for one factor. If we want
> to
> > add more loading factors, LoadingWeight might need to add a 'LoadType'
> > field for distinction, WeightLoadable might need to return
> > Set.
> >
> > I'm not sure I understand this correctly, WDYT?
> >
> > Regards,
> > Xiangyu
> >
> > Yuepeng Pan  于2023年10月11日周三 13:53写道:
> >
> > > Hi, xiangyu,
> > > Thanks for your attention as well.
> > >
> > > >1, About the waiting mechanism: Will the waiting mechanism happen
> only in
> > > >the second level 'assigning slots to TM'? IIUC, the first level
> 'assigning
> > > >Tasks to Slots' needs only the asynchronous slot result from slotpool.
> > >
> > > As described in the latest FLIP, the introduction of the waiting
> mechanism
> > > at the second level is to ensure that, in all deployment modes such as
> > > application, session, etc., we do not fall into a local greedy state
> when
> > > selecting the optimal slot position. This requires obtaining a global
> > > resource view to get the best result.
> > > IIUC, The allocation process from Task to Slot is the generation of a
> > > mapping relationship between two abstract descriptions, and at this
> point,
> > > there is no coupling of information between tasks/slots and Task
> Managers
> > > (TMs).
> > >
> > >
> > > >2, About the slot LoadingWeight: it is reasonable to use the number of
> > > >tasks by default in the beginning, but it would be better if this
> could be
> > > >easily extended in future to distinguish between CPU-intensive and
> > > >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> > > >others have CPU bottlenecks.
> > >
> > > Nice feedback. In fact, as mentioned in the Google Doc, the
> LoadingWeight
> > > interface currently only includes a description of the number of
> tasks. So,
> > > IIUC, If there is a need to further expand descriptions of other
> resource
> > > loads, we just extend it based on the current interface and its
> > > implementations, right?
> > > Please correct me if I have misunderstood. Thanks a lot~
> > >
> > > Best,
> > > Yuepeng.
> > >
> > > On 2023/10/06 10:19:21 xiangyu feng wrote:
> > > > Thanks Yuepeng and Rui for driving this Discussion.
> > > >
> > > > Internally when we try to use Flink 1.17.1 in production, we are also
> > > > suffering from the unbalanced task distribution problem for jobs with
> > > high
> > > > qps and 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-11 Thread Yuepeng Pan
Hi, xiangyu.
Thanks for your quick reply.

>interface currently only includes a description of the number of tasks. So,
>IIUC, If there is a need to further expand
>current interface and its implementations, right?

Yes, that's indeed the case.

>I checked the interface design of LoadingWeight and WeightLoadable, AFAIK
>currently it only supports comparing the load for one factor. If we want to
>add more loading factors, LoadingWeight might need to add a 'LoadType'
>field for distinction, WeightLoadable might need to return
>Set.

Thank you for the clarification, I think I roughly understand your description:
In fact, regarding the specific implementation and extension of this 
LoadingWeight, we can extend it based on this interface and its implementation 
as mentioned above.
If making frequent changes to the interface and its implementation is really 
tiresome, we can also consider introducing a built-in collapsible Map or other 
type of attribute, like the SlotSharingGroup class in the 
org.apache.flink.api.common.operators package, to describe the specific 
collection of load values and types. This way, these loads are collapsed within 
the LoadingWeight's implementation and can be expanded when needed for use. Of 
course, we can also consider an implementation like the one you mentioned, 
introducing a method in WeightLoadable that returns a collection as the return 
type, so the load values are expanded at the calling site and then used. As I 
understand it, both approaches can achieve the goal.

Of course, I also look forward to hearing others' suggestions. If there are any 
mistakes in my statement, please correct me. 
Looking forward to your reply.

Best regards.
Yuepeng Pan

On 2023/10/11 08:44:51 xiangyu feng wrote:
> Hi Yuepeng,
> 
> Thx for ur reply.
> 
> > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight
> interface currently only includes a description of the number of tasks. So,
> IIUC, If there is a need to further expand
> > descriptions of other resource loads, we just extend it based on the
> current interface and its implementations, right?
> 
> I checked the interface design of LoadingWeight and WeightLoadable, AFAIK
> currently it only supports comparing the load for one factor. If we want to
> add more loading factors, LoadingWeight might need to add a 'LoadType'
> field for distinction, WeightLoadable might need to return
> Set.
> 
> I'm not sure I understand this correctly, WDYT?
> 
> Regards,
> Xiangyu
> 
> Yuepeng Pan  于2023年10月11日周三 13:53写道:
> 
> > Hi, xiangyu,
> > Thanks for your attention as well.
> >
> > >1, About the waiting mechanism: Will the waiting mechanism happen only in
> > >the second level 'assigning slots to TM'? IIUC, the first level 'assigning
> > >Tasks to Slots' needs only the asynchronous slot result from slotpool.
> >
> > As described in the latest FLIP, the introduction of the waiting mechanism
> > at the second level is to ensure that, in all deployment modes such as
> > application, session, etc., we do not fall into a local greedy state when
> > selecting the optimal slot position. This requires obtaining a global
> > resource view to get the best result.
> > IIUC, The allocation process from Task to Slot is the generation of a
> > mapping relationship between two abstract descriptions, and at this point,
> > there is no coupling of information between tasks/slots and Task Managers
> > (TMs).
> >
> >
> > >2, About the slot LoadingWeight: it is reasonable to use the number of
> > >tasks by default in the beginning, but it would be better if this could be
> > >easily extended in future to distinguish between CPU-intensive and
> > >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> > >others have CPU bottlenecks.
> >
> > Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight
> > interface currently only includes a description of the number of tasks. So,
> > IIUC, If there is a need to further expand descriptions of other resource
> > loads, we just extend it based on the current interface and its
> > implementations, right?
> > Please correct me if I have misunderstood. Thanks a lot~
> >
> > Best,
> > Yuepeng.
> >
> > On 2023/10/06 10:19:21 xiangyu feng wrote:
> > > Thanks Yuepeng and Rui for driving this Discussion.
> > >
> > > Internally when we try to use Flink 1.17.1 in production, we are also
> > > suffering from the unbalanced task distribution problem for jobs with
> > high
> > > qps and complex dag. So +1 for the overall proposal.
> > >
> > > Some questions about the details:
> > >
> > > 1, About the waiting mechanism: Will the waiting mechanism happen only in
> > > the second level 'assigning slots to TM'?  IIUC, the first level
> > 'assigning
> > > Tasks to Slots' needs only the asynchronous slot result from slotpool.
> > >
> > > 2, About the slot LoadingWeight: it is reasonable to use the number of
> > > tasks by default in the beginning, but it would be better if this could
> > be

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-11 Thread xiangyu feng
Hi Yuepeng,

Thx for ur reply.

> Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight
interface currently only includes a description of the number of tasks. So,
IIUC, If there is a need to further expand
> descriptions of other resource loads, we just extend it based on the
current interface and its implementations, right?

I checked the interface design of LoadingWeight and WeightLoadable, AFAIK
currently it only supports comparing the load for one factor. If we want to
add more loading factors, LoadingWeight might need to add a 'LoadType'
field for distinction, WeightLoadable might need to return
Set.

I'm not sure I understand this correctly, WDYT?

Regards,
Xiangyu

Yuepeng Pan  于2023年10月11日周三 13:53写道:

> Hi, xiangyu,
> Thanks for your attention as well.
>
> >1, About the waiting mechanism: Will the waiting mechanism happen only in
> >the second level 'assigning slots to TM'? IIUC, the first level 'assigning
> >Tasks to Slots' needs only the asynchronous slot result from slotpool.
>
> As described in the latest FLIP, the introduction of the waiting mechanism
> at the second level is to ensure that, in all deployment modes such as
> application, session, etc., we do not fall into a local greedy state when
> selecting the optimal slot position. This requires obtaining a global
> resource view to get the best result.
> IIUC, The allocation process from Task to Slot is the generation of a
> mapping relationship between two abstract descriptions, and at this point,
> there is no coupling of information between tasks/slots and Task Managers
> (TMs).
>
>
> >2, About the slot LoadingWeight: it is reasonable to use the number of
> >tasks by default in the beginning, but it would be better if this could be
> >easily extended in future to distinguish between CPU-intensive and
> >IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> >others have CPU bottlenecks.
>
> Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight
> interface currently only includes a description of the number of tasks. So,
> IIUC, If there is a need to further expand descriptions of other resource
> loads, we just extend it based on the current interface and its
> implementations, right?
> Please correct me if I have misunderstood. Thanks a lot~
>
> Best,
> Yuepeng.
>
> On 2023/10/06 10:19:21 xiangyu feng wrote:
> > Thanks Yuepeng and Rui for driving this Discussion.
> >
> > Internally when we try to use Flink 1.17.1 in production, we are also
> > suffering from the unbalanced task distribution problem for jobs with
> high
> > qps and complex dag. So +1 for the overall proposal.
> >
> > Some questions about the details:
> >
> > 1, About the waiting mechanism: Will the waiting mechanism happen only in
> > the second level 'assigning slots to TM'?  IIUC, the first level
> 'assigning
> > Tasks to Slots' needs only the asynchronous slot result from slotpool.
> >
> > 2, About the slot LoadingWeight: it is reasonable to use the number of
> > tasks by default in the beginning, but it would be better if this could
> be
> > easily extended in future to distinguish between CPU-intensive and
> > IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> > others have CPU bottlenecks.
> >
> > Regards,
> > Xiangyu
> >
> >
> > Yuepeng Pan  于2023年10月5日周四 18:34写道:
> >
> > > Hi, Zhu Zhu,
> > >
> > > Thanks for your feedback!
> > >
> > > > I think we can introduce a new config option
> > > > `taskmanager.load-balance.mode`,
> > > > which accepts "None"/"Slots"/"Tasks".
> `cluster.evenly-spread-out-slots`
> > > > can be superseded by the "Slots" mode and get deprecated. In the
> future
> > > > it can support more mode, e.g. "CpuCores", to work better for jobs
> with
> > > > fine-grained resources. The proposed config option
> > > > `slot.request.max-interval`
> > > > then can be renamed to
> > > `taskmanager.load-balance.request-stablizing-timeout`
> > > > to show its relation with the feature. The proposed
> > > `slot.sharing-strategy`
> > > > is not needed, because the configured "Tasks" mode will do the work.
> > >
> > > The new proposed configuration option sounds good to me.
> > >
> > > I have a small question, If we set our configuration value to 'Tasks,'
> it
> > > will initiate two processes: balancing the allocation of task
> quantities at
> > > the slot level and balancing the number of tasks across TaskManagers
> (TMs).
> > > Alternatively, if we configure it as 'Slots,' the system will employ
> the
> > > LocalPreferred allocation policy (which is the default) when assigning
> > > tasks to slots, and it will ensure that the number of slots used
> across TMs
> > > is balanced.
> > > Does  this configuration essentially combine a balanced selection
> strategy
> > > across two dimensions into fixed configuration items, right?
> > >
> > > I would appreciate it if you could correct me if I've made any errors.
> > >
> > > Best,
> > > Yuepeng.
> > >
> >
>


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-10 Thread Yuepeng Pan
Hi, xiangyu,
Thanks for your attention as well.

>1, About the waiting mechanism: Will the waiting mechanism happen only in
>the second level 'assigning slots to TM'? IIUC, the first level 'assigning
>Tasks to Slots' needs only the asynchronous slot result from slotpool.

As described in the latest FLIP, the introduction of the waiting mechanism at 
the second level is to ensure that, in all deployment modes such as 
application, session, etc., we do not fall into a local greedy state when 
selecting the optimal slot position. This requires obtaining a global resource 
view to get the best result.
IIUC, The allocation process from Task to Slot is the generation of a mapping 
relationship between two abstract descriptions, and at this point, there is no 
coupling of information between tasks/slots and Task Managers (TMs). 


>2, About the slot LoadingWeight: it is reasonable to use the number of
>tasks by default in the beginning, but it would be better if this could be
>easily extended in future to distinguish between CPU-intensive and
>IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
>others have CPU bottlenecks.

Nice feedback. In fact, as mentioned in the Google Doc, the LoadingWeight 
interface currently only includes a description of the number of tasks. So, 
IIUC, If there is a need to further expand descriptions of other resource 
loads, we just extend it based on the current interface and its 
implementations, right?
Please correct me if I have misunderstood. Thanks a lot~

Best,
Yuepeng.

On 2023/10/06 10:19:21 xiangyu feng wrote:
> Thanks Yuepeng and Rui for driving this Discussion.
> 
> Internally when we try to use Flink 1.17.1 in production, we are also
> suffering from the unbalanced task distribution problem for jobs with high
> qps and complex dag. So +1 for the overall proposal.
> 
> Some questions about the details:
> 
> 1, About the waiting mechanism: Will the waiting mechanism happen only in
> the second level 'assigning slots to TM'?  IIUC, the first level 'assigning
> Tasks to Slots' needs only the asynchronous slot result from slotpool.
> 
> 2, About the slot LoadingWeight: it is reasonable to use the number of
> tasks by default in the beginning, but it would be better if this could be
> easily extended in future to distinguish between CPU-intensive and
> IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> others have CPU bottlenecks.
> 
> Regards,
> Xiangyu
> 
> 
> Yuepeng Pan  于2023年10月5日周四 18:34写道:
> 
> > Hi, Zhu Zhu,
> >
> > Thanks for your feedback!
> >
> > > I think we can introduce a new config option
> > > `taskmanager.load-balance.mode`,
> > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> > > can be superseded by the "Slots" mode and get deprecated. In the future
> > > it can support more mode, e.g. "CpuCores", to work better for jobs with
> > > fine-grained resources. The proposed config option
> > > `slot.request.max-interval`
> > > then can be renamed to
> > `taskmanager.load-balance.request-stablizing-timeout`
> > > to show its relation with the feature. The proposed
> > `slot.sharing-strategy`
> > > is not needed, because the configured "Tasks" mode will do the work.
> >
> > The new proposed configuration option sounds good to me.
> >
> > I have a small question, If we set our configuration value to 'Tasks,' it
> > will initiate two processes: balancing the allocation of task quantities at
> > the slot level and balancing the number of tasks across TaskManagers (TMs).
> > Alternatively, if we configure it as 'Slots,' the system will employ the
> > LocalPreferred allocation policy (which is the default) when assigning
> > tasks to slots, and it will ensure that the number of slots used across TMs
> > is balanced.
> > Does  this configuration essentially combine a balanced selection strategy
> > across two dimensions into fixed configuration items, right?
> >
> > I would appreciate it if you could correct me if I've made any errors.
> >
> > Best,
> > Yuepeng.
> >
> 


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-10 Thread Yuepeng Pan
Hi, David, 
Thank you very much for your attention.

>The problem you're trying to solve only exists in complex graphs with
>different per-vertex parallelism. If the parallelism is set globally
>(assuming the pipeline has roughly even data skew), the algorithm could
>make things slightly worse by eliminating some local exchanges. Is that
>correct? 

Your understanding is accurate, and it's undeniable that such use case 
scenarios exist.

>Where I'm headed with this is that there could be a hybrid strategy that
>provides a reasonable default when the pipeline uses slot-sharing (for
>per-vertex parallelism, use the new strategy; for global parallelism use
>the old one). It's always a shame if improvements like this end up being a
>power-user feature and very few workloads benefit from it. Any thoughts? 

The concept of letting the engine determine the scheduling strategy based on a 
predefined rule is excellent. This approach aims to maximize job performance 
while minimizing user intervention.
It might not need to rush into implementing this rule at this moment. What I 
mean is, we can evaluate and develop a well-founded rule in future work. 
Nonetheless, we can still consider this rule in advance so that it can be 
validated after the feature's release.
Additionally, if we decide to implement this rule in the future, it should be 
introduced as a switch. As you pointed out, we currently don't take data 
characteristics' impact on task resource allocation in the actual environment 
into account. Therefore, implementing it as a switch will offer users greater 
flexibility. Of course, it will add a little complexity to users' understanding 
of this parameter.

I'm also eager to hear from other contributors regarding it and looking forward 
to your reply.

Best,
Yuepeng.

On 2023/10/02 20:37:12 David Morávek wrote:
> Hello Yuepeng,
> 
> The FLIP reads sane; nice work! To re-phrase my understanding:
> 
> The problem you're trying to solve only exists in complex graphs with
> different per-vertex parallelism. If the parallelism is set globally
> (assuming the pipeline has roughly even data skew), the algorithm could
> make things slightly worse by eliminating some local exchanges. Is that
> correct?
> 
> Where I'm headed with this is that there could be a hybrid strategy that
> provides a reasonable default when the pipeline uses slot-sharing (for
> per-vertex parallelism, use the new strategy; for global parallelism use
> the old one). It's always a shame if improvements like this end up being a
> power-user feature and very few workloads benefit from it. Any thoughts?
> 
> Best,
> D.
> 
> On Sun, Oct 1, 2023 at 1:38 PM Yangze Guo  wrote:
> 
> > Hi, Rui,
> >
> > 1. With the current mechanism, when physical slots are offered from
> > TM, the JobMaster will start deploying tasks and synchronizing their
> > states. With the addition of the waiting mechanism, IIUC, the
> > JobMaster will deploy and synchronize the states of all tasks only
> > after all resources are available. The task deployment and state
> > synchronization both occupy the JobMaster's RPC main thread. In
> > complex jobs with a lot of tasks, this waiting mechanism may increase
> > the pressure on the JobMaster and increase the end-to-end job
> > deployment time.
> >
> > 2. From my understanding, if user enable the
> > cluster.evenly-spread-out-slots,
> > LeastUtilizationResourceMatchingStrategy will be used to determine the
> > slot distribution and the slot allocation in the three TM will be
> > (taskmanager.numberOfTaskSlots=3):
> > TM1: 3 slot
> > TM2: 2 slot
> > TM3: 2 slot
> >
> > Best,
> > Yangze Guo
> >
> > On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > Hi Shammon,
> > >
> > > Thanks for your feedback as well!
> > >
> > > > IIUC, the overall balance is divided into two parts: slot to TM and
> > task
> > > to slot.
> > > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> > > > 2. Task to slot is guaranteed by the slot pool in JM
> > > >
> > > > These two are completely independent, what are the benefits of unifying
> > > > these two into one option? Also, do we want to share the same
> > > > option between SlotPool in JM and SlotManager in RM? This sounds a bit
> > > > strange.
> > >
> > > Your understanding is totally right, the balance needs 2 parts: slot to
> > TM
> > > and task to slot.
> > >
> > > As I understand, the following are benefits of unifying them into one
> > > option:
> > >
> > > - Flink users don't care about these principles inside of flink, they
> > don't
> > > know these 2 parts.
> > > - If flink provides 2 options, flink users need to set 2 options for
> > their
> > > job.
> > > - If one option is missed, the final result may not be good. (Users may
> > > have questions when using)
> > > - If flink just provides 1 option, enabling one option is enough. (Reduce
> > > the probability of misconfiguration)
> > >
> > > Also, Flink’s options are user-oriented. Each option 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-10 Thread Rui Fan
Hi Zhu,

Thanks for your clarification!

I misunderstood before, it's clear now.

Best,
Rui

On Tue, Oct 10, 2023 at 6:17 PM Zhu Zhu  wrote:

> Hi Rui,
>
> Not sure if I understand your question correctly. The two modes are not
> the same:
> {taskmanager.load-balance.mode: Slots} = {cluster.evenly-spread-out-slots:
> true, slot.sharing-strategy: LOCAL_INPUT_PREFERRED}
> {taskmanager.load-balance.mode: Tasks} = {cluster.evenly-spread-out-slots:
> true, slot.sharing-strategy: TASK_BALANCED_PREFERRED}
>
> Thanks,
> Zhu
>
> Rui Fan <1996fan...@gmail.com> 于2023年10月10日周二 10:27写道:
>
>> Hi Zhu,
>>
>> Thanks for your feedback!
>>
>> >> 2. When it's set to Tasks, how to assign slots to TM?
>> > It's option2 at the moment. However, I think it's just implementation
>> > details and can be changed/refined later.
>> >
>> > As you mentioned in another comment, 'taskmanager.load-balance.mode' is
>> > a user oriented configuration. The goal is to achieve load balance,
>> while
>> > the load can be defined as allocated slots or assigned tasks.
>> > The 'Tasks' mode, just the same as what is proposed in the FLIP,
>> currently
>> > use the mechanism of 'cluster.evenly-spread-out-slots' to help to
>> achieve
>> > balanced number of tasks. It's not perfect, but has acceptable
>> effectiveness
>> > and lower implementation complexity.
>> >
>> > The 'Slots' mode is needed for compatible reasons. Users that are
>> satisfied
>> > with the current ability of 'cluster.evenly-spread-out-slots' can
>> continue
>> > using it after the config 'cluster.evenly-spread-out-slots' is
>> deprecated.
>>
>> IIUC, the 'Slots' mode is needed for compatibility with
>> 'cluster.evenly-spread-out-slots'.
>> The reason I ask this question is: if the behavior and logic of 'Slots'
>> and
>> 'Tasks' are exactly the same, it feels a bit strange to define two
>> enumerations.
>> And it may cause confusion to users.
>>
>> If they are totally the same, how about combining them to SlotsAndTasks?
>> It can be compatible with 'cluster.evenly-spread-out-slots', and avoid
>> the redundant enum. Of course, if the name(SlotsAndTasks) is ugly,
>> we can discuss it. The core idea is combining them.
>>
>> WDYT?
>>
>> Best,
>> Rui
>>
>> On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu  wrote:
>>
>>> Thanks for the response, Rui and Yuepeng.
>>>
>>> >> Rui
>>> > 1. The default value is None, right?
>>> Exactly.
>>>
>>> > 2. When it's set to Tasks, how to assign slots to TM?
>>> It's option2 at the moment. However, I think it's just implementation
>>> details and can be changed/refined later.
>>>
>>> As you mentioned in another comment, 'taskmanager.load-balance.mode' is
>>> a user oriented configuration. The goal is to achieve load balance,
>>> while
>>> the load can be defined as allocated slots or assigned tasks.
>>> The 'Tasks' mode, just the same as what is proposed in the FLIP,
>>> currently
>>> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
>>> balanced number of tasks. It's not perfect, but has acceptable
>>> effectiveness
>>> and lower implementation complexity.
>>>
>>> The 'Slots' mode is needed for compatible reasons. Users that are
>>> satisfied
>>> with the current ability of 'cluster.evenly-spread-out-slots' can
>>> continue
>>> using it after the config 'cluster.evenly-spread-out-slots' is
>>> deprecated.
>>>
>>>
>>> >> Yuepeng
>>> I think what users want is load balance. The combination is
>>> implementation
>>> details and should be transparent to users.
>>>
>>> Meanwhile, I think locality does not entirely conflict with load
>>> balance. In fact,
>>> they should be both considered when assigning tasks. Usually, state
>>> locality
>>> should have the highest priority, and input locality can also be taken
>>> care
>>> of when trying to balance tasks to slots and TMs. We can see that the
>>> most
>>> important input locality, i.e. forward, is always covered in this FLIP
>>> when
>>> computing slot sharing groups. It can be further optimized if we find it
>>> problematic.
>>>
>>> Thanks,
>>> Zhu
>>>
>>> Yangze Guo  于2023年10月8日周日 13:53写道:
>>>
 Thanks for the updates, Rui.

 It does seem challenging to ensure evenness in slot deployment unless
 we introduce batch slot requests in SlotPool. However, one possibility
 is to add a delay of around 50ms during the SlotPool's resource
 requirement declaration to the ResourceManager, similar to the
 checkResourceRequirementsWithDelay in the SlotManager. In most cases,
 this delay would allow the SlotManager to see all resource
 requirements, then it can allocate the slot more evenly. As a side
 effect, it could also significantly reduce the number of RPC messages
 to the ResourceManager, which could become a single-point bottleneck
 in OLAP scenarios. WDYT?

 Best,
 Yangze Guo

 On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
 >
 > Hi Yangze,
 >
 > Thanks for your quick response!
 >
 > 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-10 Thread Zhu Zhu
Hi Rui,

Not sure if I understand your question correctly. The two modes are not the
same:
{taskmanager.load-balance.mode: Slots} = {cluster.evenly-spread-out-slots:
true, slot.sharing-strategy: LOCAL_INPUT_PREFERRED}
{taskmanager.load-balance.mode: Tasks} = {cluster.evenly-spread-out-slots:
true, slot.sharing-strategy: TASK_BALANCED_PREFERRED}

Thanks,
Zhu

Rui Fan <1996fan...@gmail.com> 于2023年10月10日周二 10:27写道:

> Hi Zhu,
>
> Thanks for your feedback!
>
> >> 2. When it's set to Tasks, how to assign slots to TM?
> > It's option2 at the moment. However, I think it's just implementation
> > details and can be changed/refined later.
> >
> > As you mentioned in another comment, 'taskmanager.load-balance.mode' is
> > a user oriented configuration. The goal is to achieve load balance,
> while
> > the load can be defined as allocated slots or assigned tasks.
> > The 'Tasks' mode, just the same as what is proposed in the FLIP,
> currently
> > use the mechanism of 'cluster.evenly-spread-out-slots' to help to
> achieve
> > balanced number of tasks. It's not perfect, but has acceptable
> effectiveness
> > and lower implementation complexity.
> >
> > The 'Slots' mode is needed for compatible reasons. Users that are
> satisfied
> > with the current ability of 'cluster.evenly-spread-out-slots' can
> continue
> > using it after the config 'cluster.evenly-spread-out-slots' is
> deprecated.
>
> IIUC, the 'Slots' mode is needed for compatibility with
> 'cluster.evenly-spread-out-slots'.
> The reason I ask this question is: if the behavior and logic of 'Slots'
> and
> 'Tasks' are exactly the same, it feels a bit strange to define two
> enumerations.
> And it may cause confusion to users.
>
> If they are totally the same, how about combining them to SlotsAndTasks?
> It can be compatible with 'cluster.evenly-spread-out-slots', and avoid
> the redundant enum. Of course, if the name(SlotsAndTasks) is ugly,
> we can discuss it. The core idea is combining them.
>
> WDYT?
>
> Best,
> Rui
>
> On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu  wrote:
>
>> Thanks for the response, Rui and Yuepeng.
>>
>> >> Rui
>> > 1. The default value is None, right?
>> Exactly.
>>
>> > 2. When it's set to Tasks, how to assign slots to TM?
>> It's option2 at the moment. However, I think it's just implementation
>> details and can be changed/refined later.
>>
>> As you mentioned in another comment, 'taskmanager.load-balance.mode' is
>> a user oriented configuration. The goal is to achieve load balance, while
>> the load can be defined as allocated slots or assigned tasks.
>> The 'Tasks' mode, just the same as what is proposed in the FLIP, currently
>> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
>> balanced number of tasks. It's not perfect, but has acceptable
>> effectiveness
>> and lower implementation complexity.
>>
>> The 'Slots' mode is needed for compatible reasons. Users that are
>> satisfied
>> with the current ability of 'cluster.evenly-spread-out-slots' can continue
>> using it after the config 'cluster.evenly-spread-out-slots' is deprecated.
>>
>>
>> >> Yuepeng
>> I think what users want is load balance. The combination is implementation
>> details and should be transparent to users.
>>
>> Meanwhile, I think locality does not entirely conflict with load balance.
>> In fact,
>> they should be both considered when assigning tasks. Usually, state
>> locality
>> should have the highest priority, and input locality can also be taken
>> care
>> of when trying to balance tasks to slots and TMs. We can see that the most
>> important input locality, i.e. forward, is always covered in this FLIP
>> when
>> computing slot sharing groups. It can be further optimized if we find it
>> problematic.
>>
>> Thanks,
>> Zhu
>>
>> Yangze Guo  于2023年10月8日周日 13:53写道:
>>
>>> Thanks for the updates, Rui.
>>>
>>> It does seem challenging to ensure evenness in slot deployment unless
>>> we introduce batch slot requests in SlotPool. However, one possibility
>>> is to add a delay of around 50ms during the SlotPool's resource
>>> requirement declaration to the ResourceManager, similar to the
>>> checkResourceRequirementsWithDelay in the SlotManager. In most cases,
>>> this delay would allow the SlotManager to see all resource
>>> requirements, then it can allocate the slot more evenly. As a side
>>> effect, it could also significantly reduce the number of RPC messages
>>> to the ResourceManager, which could become a single-point bottleneck
>>> in OLAP scenarios. WDYT?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
>>> >
>>> > Hi Yangze,
>>> >
>>> > Thanks for your quick response!
>>> >
>>> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
>>> > it isn't clear. The root cause of introducing the waiting mechanism is
>>> > that the slot requests are sent from JobMaster to SlotPool is
>>> > one by one instead of one whole batch. I have rewritten the 2.2.2 part,
>>> > please 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-09 Thread Rui Fan
Hi Zhu,

Thanks for your feedback!

>> 2. When it's set to Tasks, how to assign slots to TM?
> It's option2 at the moment. However, I think it's just implementation
> details and can be changed/refined later.
>
> As you mentioned in another comment, 'taskmanager.load-balance.mode' is
> a user oriented configuration. The goal is to achieve load balance, while
> the load can be defined as allocated slots or assigned tasks.
> The 'Tasks' mode, just the same as what is proposed in the FLIP, currently
> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
> balanced number of tasks. It's not perfect, but has acceptable
effectiveness
> and lower implementation complexity.
>
> The 'Slots' mode is needed for compatible reasons. Users that are
satisfied
> with the current ability of 'cluster.evenly-spread-out-slots' can continue
> using it after the config 'cluster.evenly-spread-out-slots' is deprecated.

IIUC, the 'Slots' mode is needed for compatibility with
'cluster.evenly-spread-out-slots'.
The reason I ask this question is: if the behavior and logic of 'Slots' and
'Tasks' are exactly the same, it feels a bit strange to define two
enumerations.
And it may cause confusion to users.

If they are totally the same, how about combining them to SlotsAndTasks?
It can be compatible with 'cluster.evenly-spread-out-slots', and avoid
the redundant enum. Of course, if the name(SlotsAndTasks) is ugly,
we can discuss it. The core idea is combining them.

WDYT?

Best,
Rui

On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu  wrote:

> Thanks for the response, Rui and Yuepeng.
>
> >> Rui
> > 1. The default value is None, right?
> Exactly.
>
> > 2. When it's set to Tasks, how to assign slots to TM?
> It's option2 at the moment. However, I think it's just implementation
> details and can be changed/refined later.
>
> As you mentioned in another comment, 'taskmanager.load-balance.mode' is
> a user oriented configuration. The goal is to achieve load balance, while
> the load can be defined as allocated slots or assigned tasks.
> The 'Tasks' mode, just the same as what is proposed in the FLIP, currently
> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
> balanced number of tasks. It's not perfect, but has acceptable
> effectiveness
> and lower implementation complexity.
>
> The 'Slots' mode is needed for compatible reasons. Users that are satisfied
> with the current ability of 'cluster.evenly-spread-out-slots' can continue
> using it after the config 'cluster.evenly-spread-out-slots' is deprecated.
>
>
> >> Yuepeng
> I think what users want is load balance. The combination is implementation
> details and should be transparent to users.
>
> Meanwhile, I think locality does not entirely conflict with load balance.
> In fact,
> they should be both considered when assigning tasks. Usually, state
> locality
> should have the highest priority, and input locality can also be taken care
> of when trying to balance tasks to slots and TMs. We can see that the most
> important input locality, i.e. forward, is always covered in this FLIP when
> computing slot sharing groups. It can be further optimized if we find it
> problematic.
>
> Thanks,
> Zhu
>
> Yangze Guo  于2023年10月8日周日 13:53写道:
>
>> Thanks for the updates, Rui.
>>
>> It does seem challenging to ensure evenness in slot deployment unless
>> we introduce batch slot requests in SlotPool. However, one possibility
>> is to add a delay of around 50ms during the SlotPool's resource
>> requirement declaration to the ResourceManager, similar to the
>> checkResourceRequirementsWithDelay in the SlotManager. In most cases,
>> this delay would allow the SlotManager to see all resource
>> requirements, then it can allocate the slot more evenly. As a side
>> effect, it could also significantly reduce the number of RPC messages
>> to the ResourceManager, which could become a single-point bottleneck
>> in OLAP scenarios. WDYT?
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > Hi Yangze,
>> >
>> > Thanks for your quick response!
>> >
>> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
>> > it isn't clear. The root cause of introducing the waiting mechanism is
>> > that the slot requests are sent from JobMaster to SlotPool is
>> > one by one instead of one whole batch. I have rewritten the 2.2.2 part,
>> > please read it again in your free time.
>> >
>> > [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
>> >
>> > Best,
>> > Rui
>> >
>> > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo  wrote:
>> >>
>> >> Thanks for the clarification, Rui.
>> >>
>> >> I believe the root cause of this issue is that in the current
>> >> DefaultResourceAllocationStrategy, slot allocation begins before the
>> >> decision to PendingTaskManagers requesting is made. That can be fixed
>> >> within the 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-09 Thread Zhu Zhu
Thanks for the response, Rui and Yuepeng.

>> Rui
> 1. The default value is None, right?
Exactly.

> 2. When it's set to Tasks, how to assign slots to TM?
It's option2 at the moment. However, I think it's just implementation
details and can be changed/refined later.

As you mentioned in another comment, 'taskmanager.load-balance.mode' is
a user oriented configuration. The goal is to achieve load balance, while
the load can be defined as allocated slots or assigned tasks.
The 'Tasks' mode, just the same as what is proposed in the FLIP, currently
use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
balanced number of tasks. It's not perfect, but has acceptable effectiveness
and lower implementation complexity.

The 'Slots' mode is needed for compatible reasons. Users that are satisfied
with the current ability of 'cluster.evenly-spread-out-slots' can continue
using it after the config 'cluster.evenly-spread-out-slots' is deprecated.


>> Yuepeng
I think what users want is load balance. The combination is implementation
details and should be transparent to users.

Meanwhile, I think locality does not entirely conflict with load balance.
In fact,
they should be both considered when assigning tasks. Usually, state locality
should have the highest priority, and input locality can also be taken care
of when trying to balance tasks to slots and TMs. We can see that the most
important input locality, i.e. forward, is always covered in this FLIP when
computing slot sharing groups. It can be further optimized if we find it
problematic.

Thanks,
Zhu

Yangze Guo  于2023年10月8日周日 13:53写道:

> Thanks for the updates, Rui.
>
> It does seem challenging to ensure evenness in slot deployment unless
> we introduce batch slot requests in SlotPool. However, one possibility
> is to add a delay of around 50ms during the SlotPool's resource
> requirement declaration to the ResourceManager, similar to the
> checkResourceRequirementsWithDelay in the SlotManager. In most cases,
> this delay would allow the SlotManager to see all resource
> requirements, then it can allocate the slot more evenly. As a side
> effect, it could also significantly reduce the number of RPC messages
> to the ResourceManager, which could become a single-point bottleneck
> in OLAP scenarios. WDYT?
>
> Best,
> Yangze Guo
>
> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Yangze,
> >
> > Thanks for your quick response!
> >
> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
> > it isn't clear. The root cause of introducing the waiting mechanism is
> > that the slot requests are sent from JobMaster to SlotPool is
> > one by one instead of one whole batch. I have rewritten the 2.2.2 part,
> > please read it again in your free time.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
> >
> > Best,
> > Rui
> >
> > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo  wrote:
> >>
> >> Thanks for the clarification, Rui.
> >>
> >> I believe the root cause of this issue is that in the current
> >> DefaultResourceAllocationStrategy, slot allocation begins before the
> >> decision to PendingTaskManagers requesting is made. That can be fixed
> >> within the strategy without introducing another waiting mechanism. I
> >> think it would be better to address this issue within the scope of
> >> this FLIP. However, I don't have a strong opinion on it, it depends on
> >> your bandwidth.
> >>
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote:
> >> >
> >> > Hi Yangze,
> >> >
> >> > > 2. From my understanding, if user enable the
> >> > > cluster.evenly-spread-out-slots,
> >> > > LeastUtilizationResourceMatchingStrategy will be used to determine
> the
> >> > > slot distribution and the slot allocation in the three TM will be
> >> > > (taskmanager.numberOfTaskSlots=3):
> >> > > TM1: 3 slot
> >> > > TM2: 2 slot
> >> > > TM3: 2 slot
> >> >
> >> > When all tms are ready in advance, the three TM will be:
> >> > TM1: 3 slot
> >> > TM2: 2 slot
> >> > TM3: 2 slot
> >> >
> >> > For application mode, the resource manager doesn't apply for
> >> > TM in advance, and slots aren't enough before the third TM is ready.
> >> > So all slots of the second TM will be used up. The three TM will be:
> >> > TM1: 3 slot
> >> > TM2: 3 slot
> >> > TM3: 1 slot
> >> >
> >> > That's why the FLIP add some notes:
> >> >
> >> > All free slots are in the last TM, because ResourceManager doesn’t
> have the waiting mechanism, and it just requests 7 slots for this JobMaster.
> >> > Why is it acceptable?
> >> >
> >> > If we just add the waiting mechanism to JobMaster but not in
> ResourceManager, all free slots will be in the last TM. All slots of other
> TMs are offered to JM.
> >> > That is, only one TM may have fewer tasks than the other TMs. The
> difference between the 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-07 Thread Yangze Guo
Thanks for the updates, Rui.

It does seem challenging to ensure evenness in slot deployment unless
we introduce batch slot requests in SlotPool. However, one possibility
is to add a delay of around 50ms during the SlotPool's resource
requirement declaration to the ResourceManager, similar to the
checkResourceRequirementsWithDelay in the SlotManager. In most cases,
this delay would allow the SlotManager to see all resource
requirements, then it can allocate the slot more evenly. As a side
effect, it could also significantly reduce the number of RPC messages
to the ResourceManager, which could become a single-point bottleneck
in OLAP scenarios. WDYT?

Best,
Yangze Guo

On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi Yangze,
>
> Thanks for your quick response!
>
> Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
> it isn't clear. The root cause of introducing the waiting mechanism is
> that the slot requests are sent from JobMaster to SlotPool is
> one by one instead of one whole batch. I have rewritten the 2.2.2 part,
> please read it again in your free time.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
>
> Best,
> Rui
>
> On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo  wrote:
>>
>> Thanks for the clarification, Rui.
>>
>> I believe the root cause of this issue is that in the current
>> DefaultResourceAllocationStrategy, slot allocation begins before the
>> decision to PendingTaskManagers requesting is made. That can be fixed
>> within the strategy without introducing another waiting mechanism. I
>> think it would be better to address this issue within the scope of
>> this FLIP. However, I don't have a strong opinion on it, it depends on
>> your bandwidth.
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > Hi Yangze,
>> >
>> > > 2. From my understanding, if user enable the
>> > > cluster.evenly-spread-out-slots,
>> > > LeastUtilizationResourceMatchingStrategy will be used to determine the
>> > > slot distribution and the slot allocation in the three TM will be
>> > > (taskmanager.numberOfTaskSlots=3):
>> > > TM1: 3 slot
>> > > TM2: 2 slot
>> > > TM3: 2 slot
>> >
>> > When all tms are ready in advance, the three TM will be:
>> > TM1: 3 slot
>> > TM2: 2 slot
>> > TM3: 2 slot
>> >
>> > For application mode, the resource manager doesn't apply for
>> > TM in advance, and slots aren't enough before the third TM is ready.
>> > So all slots of the second TM will be used up. The three TM will be:
>> > TM1: 3 slot
>> > TM2: 3 slot
>> > TM3: 1 slot
>> >
>> > That's why the FLIP add some notes:
>> >
>> > All free slots are in the last TM, because ResourceManager doesn’t have 
>> > the waiting mechanism, and it just requests 7 slots for this JobMaster.
>> > Why is it acceptable?
>> >
>> > If we just add the waiting mechanism to JobMaster but not in 
>> > ResourceManager, all free slots will be in the last TM. All slots of other 
>> > TMs are offered to JM.
>> > That is, only one TM may have fewer tasks than the other TMs. The 
>> > difference between the number of tasks of other TMs is at most 1.So When p 
>> > >> slotsPerTM, the problem can be ignored.
>> > We can also suggest users, in cases that p is small, it's better to 
>> > configure slotsPerTM to 1, or let p % slotsPerTM == 0.
>> >
>> > Please correct me if my understanding is wrong, thanks~
>> >
>> > Best,
>> > Rui
>> >
>> > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo  wrote:
>> >>
>> >> Hi, Rui,
>> >>
>> >> 1. With the current mechanism, when physical slots are offered from
>> >> TM, the JobMaster will start deploying tasks and synchronizing their
>> >> states. With the addition of the waiting mechanism, IIUC, the
>> >> JobMaster will deploy and synchronize the states of all tasks only
>> >> after all resources are available. The task deployment and state
>> >> synchronization both occupy the JobMaster's RPC main thread. In
>> >> complex jobs with a lot of tasks, this waiting mechanism may increase
>> >> the pressure on the JobMaster and increase the end-to-end job
>> >> deployment time.
>> >>
>> >> 2. From my understanding, if user enable the
>> >> cluster.evenly-spread-out-slots,
>> >> LeastUtilizationResourceMatchingStrategy will be used to determine the
>> >> slot distribution and the slot allocation in the three TM will be
>> >> (taskmanager.numberOfTaskSlots=3):
>> >> TM1: 3 slot
>> >> TM2: 2 slot
>> >> TM3: 2 slot
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >> >
>> >> > Hi Shammon,
>> >> >
>> >> > Thanks for your feedback as well!
>> >> >
>> >> > > IIUC, the overall balance is divided into two parts: slot to TM and 
>> >> > > task
>> >> > to slot.
>> >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
>> >> > > 2. Task to slot is 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-07 Thread Rui Fan
Hi Yangze,

Thanks for your quick response!

Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
it isn't clear. The root cause of introducing the waiting mechanism is
that the slot requests are sent from JobMaster to SlotPool is
one by one instead of one whole batch. I have rewritten the 2.2.2 part,
please read it again in your free time.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism

Best,
Rui

On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo  wrote:

> Thanks for the clarification, Rui.
>
> I believe the root cause of this issue is that in the current
> DefaultResourceAllocationStrategy, slot allocation begins before the
> decision to PendingTaskManagers requesting is made. That can be fixed
> within the strategy without introducing another waiting mechanism. I
> think it would be better to address this issue within the scope of
> this FLIP. However, I don't have a strong opinion on it, it depends on
> your bandwidth.
>
>
> Best,
> Yangze Guo
>
> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Yangze,
> >
> > > 2. From my understanding, if user enable the
> > > cluster.evenly-spread-out-slots,
> > > LeastUtilizationResourceMatchingStrategy will be used to determine the
> > > slot distribution and the slot allocation in the three TM will be
> > > (taskmanager.numberOfTaskSlots=3):
> > > TM1: 3 slot
> > > TM2: 2 slot
> > > TM3: 2 slot
> >
> > When all tms are ready in advance, the three TM will be:
> > TM1: 3 slot
> > TM2: 2 slot
> > TM3: 2 slot
> >
> > For application mode, the resource manager doesn't apply for
> > TM in advance, and slots aren't enough before the third TM is ready.
> > So all slots of the second TM will be used up. The three TM will be:
> > TM1: 3 slot
> > TM2: 3 slot
> > TM3: 1 slot
> >
> > That's why the FLIP add some notes:
> >
> > All free slots are in the last TM, because ResourceManager doesn’t have
> the waiting mechanism, and it just requests 7 slots for this JobMaster.
> > Why is it acceptable?
> >
> > If we just add the waiting mechanism to JobMaster but not in
> ResourceManager, all free slots will be in the last TM. All slots of other
> TMs are offered to JM.
> > That is, only one TM may have fewer tasks than the other TMs. The
> difference between the number of tasks of other TMs is at most 1.So When p
> >> slotsPerTM, the problem can be ignored.
> > We can also suggest users, in cases that p is small, it's better to
> configure slotsPerTM to 1, or let p % slotsPerTM == 0.
> >
> > Please correct me if my understanding is wrong, thanks~
> >
> > Best,
> > Rui
> >
> > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo  wrote:
> >>
> >> Hi, Rui,
> >>
> >> 1. With the current mechanism, when physical slots are offered from
> >> TM, the JobMaster will start deploying tasks and synchronizing their
> >> states. With the addition of the waiting mechanism, IIUC, the
> >> JobMaster will deploy and synchronize the states of all tasks only
> >> after all resources are available. The task deployment and state
> >> synchronization both occupy the JobMaster's RPC main thread. In
> >> complex jobs with a lot of tasks, this waiting mechanism may increase
> >> the pressure on the JobMaster and increase the end-to-end job
> >> deployment time.
> >>
> >> 2. From my understanding, if user enable the
> >> cluster.evenly-spread-out-slots,
> >> LeastUtilizationResourceMatchingStrategy will be used to determine the
> >> slot distribution and the slot allocation in the three TM will be
> >> (taskmanager.numberOfTaskSlots=3):
> >> TM1: 3 slot
> >> TM2: 2 slot
> >> TM3: 2 slot
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
> >> >
> >> > Hi Shammon,
> >> >
> >> > Thanks for your feedback as well!
> >> >
> >> > > IIUC, the overall balance is divided into two parts: slot to TM and
> task
> >> > to slot.
> >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> >> > > 2. Task to slot is guaranteed by the slot pool in JM
> >> > >
> >> > > These two are completely independent, what are the benefits of
> unifying
> >> > > these two into one option? Also, do we want to share the same
> >> > > option between SlotPool in JM and SlotManager in RM? This sounds a
> bit
> >> > > strange.
> >> >
> >> > Your understanding is totally right, the balance needs 2 parts: slot
> to TM
> >> > and task to slot.
> >> >
> >> > As I understand, the following are benefits of unifying them into one
> >> > option:
> >> >
> >> > - Flink users don't care about these principles inside of flink, they
> don't
> >> > know these 2 parts.
> >> > - If flink provides 2 options, flink users need to set 2 options for
> their
> >> > job.
> >> > - If one option is missed, the final result may not be good. (Users
> may
> >> > have questions when using)
> >> > - If flink just provides 1 option, enabling one option is 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-07 Thread Yangze Guo
Thanks for the clarification, Rui.

I believe the root cause of this issue is that in the current
DefaultResourceAllocationStrategy, slot allocation begins before the
decision to PendingTaskManagers requesting is made. That can be fixed
within the strategy without introducing another waiting mechanism. I
think it would be better to address this issue within the scope of
this FLIP. However, I don't have a strong opinion on it, it depends on
your bandwidth.


Best,
Yangze Guo

On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi Yangze,
>
> > 2. From my understanding, if user enable the
> > cluster.evenly-spread-out-slots,
> > LeastUtilizationResourceMatchingStrategy will be used to determine the
> > slot distribution and the slot allocation in the three TM will be
> > (taskmanager.numberOfTaskSlots=3):
> > TM1: 3 slot
> > TM2: 2 slot
> > TM3: 2 slot
>
> When all tms are ready in advance, the three TM will be:
> TM1: 3 slot
> TM2: 2 slot
> TM3: 2 slot
>
> For application mode, the resource manager doesn't apply for
> TM in advance, and slots aren't enough before the third TM is ready.
> So all slots of the second TM will be used up. The three TM will be:
> TM1: 3 slot
> TM2: 3 slot
> TM3: 1 slot
>
> That's why the FLIP add some notes:
>
> All free slots are in the last TM, because ResourceManager doesn’t have the 
> waiting mechanism, and it just requests 7 slots for this JobMaster.
> Why is it acceptable?
>
> If we just add the waiting mechanism to JobMaster but not in ResourceManager, 
> all free slots will be in the last TM. All slots of other TMs are offered to 
> JM.
> That is, only one TM may have fewer tasks than the other TMs. The difference 
> between the number of tasks of other TMs is at most 1.So When p >> 
> slotsPerTM, the problem can be ignored.
> We can also suggest users, in cases that p is small, it's better to configure 
> slotsPerTM to 1, or let p % slotsPerTM == 0.
>
> Please correct me if my understanding is wrong, thanks~
>
> Best,
> Rui
>
> On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo  wrote:
>>
>> Hi, Rui,
>>
>> 1. With the current mechanism, when physical slots are offered from
>> TM, the JobMaster will start deploying tasks and synchronizing their
>> states. With the addition of the waiting mechanism, IIUC, the
>> JobMaster will deploy and synchronize the states of all tasks only
>> after all resources are available. The task deployment and state
>> synchronization both occupy the JobMaster's RPC main thread. In
>> complex jobs with a lot of tasks, this waiting mechanism may increase
>> the pressure on the JobMaster and increase the end-to-end job
>> deployment time.
>>
>> 2. From my understanding, if user enable the
>> cluster.evenly-spread-out-slots,
>> LeastUtilizationResourceMatchingStrategy will be used to determine the
>> slot distribution and the slot allocation in the three TM will be
>> (taskmanager.numberOfTaskSlots=3):
>> TM1: 3 slot
>> TM2: 2 slot
>> TM3: 2 slot
>>
>> Best,
>> Yangze Guo
>>
>> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > Hi Shammon,
>> >
>> > Thanks for your feedback as well!
>> >
>> > > IIUC, the overall balance is divided into two parts: slot to TM and task
>> > to slot.
>> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
>> > > 2. Task to slot is guaranteed by the slot pool in JM
>> > >
>> > > These two are completely independent, what are the benefits of unifying
>> > > these two into one option? Also, do we want to share the same
>> > > option between SlotPool in JM and SlotManager in RM? This sounds a bit
>> > > strange.
>> >
>> > Your understanding is totally right, the balance needs 2 parts: slot to TM
>> > and task to slot.
>> >
>> > As I understand, the following are benefits of unifying them into one
>> > option:
>> >
>> > - Flink users don't care about these principles inside of flink, they don't
>> > know these 2 parts.
>> > - If flink provides 2 options, flink users need to set 2 options for their
>> > job.
>> > - If one option is missed, the final result may not be good. (Users may
>> > have questions when using)
>> > - If flink just provides 1 option, enabling one option is enough. (Reduce
>> > the probability of misconfiguration)
>> >
>> > Also, Flink’s options are user-oriented. Each option represents a switch or
>> > parameter of a feature.
>> > A feature may be composed of multiple components inside Flink.
>> > It might be better to keep only one switch per feature.
>> >
>> > Actually, the cluster.evenly-spread-out-slots option is used between
>> > SlotPool in JM and SlotManager in RM. 2 components to ensure
>> > this feature works well.
>> >
>> > Please correct me if my understanding is wrong,
>> > and looking forward to your feedback, thanks!
>> >
>> > Best,
>> > Rui
>> >
>> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > > Hi Yangze,
>> > >
>> > > Thanks for your feedback!
>> > >
>> > > > 1. Is it possible for the SlotPool to 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-07 Thread Rui Fan
Hi Yangze,

> 2. From my understanding, if user enable the
> cluster.evenly-spread-out-slots,
> LeastUtilizationResourceMatchingStrategy will be used to determine the
> slot distribution and the slot allocation in the three TM will be
> (taskmanager.numberOfTaskSlots=3):
> TM1: 3 slot
> TM2: 2 slot
> TM3: 2 slot

When all tms are ready in advance, the three TM will be:
TM1: 3 slot
TM2: 2 slot
TM3: 2 slot

For application mode, the resource manager doesn't apply for
TM in advance, and slots aren't enough before the third TM is ready.
So all slots of the second TM will be used up. The three TM will be:
TM1: 3 slot
TM2: 3 slot
TM3: 1 slot

That's why the FLIP add some notes:

   - All *free* slots are in the last TM, because ResourceManager doesn’t
   have the waiting mechanism, and it just requests 7 slots for this JobMaster.
   - Why is it acceptable?


   -
  - If we just add the waiting mechanism to JobMaster but not in
  ResourceManager, all *free* slots will be in the last TM. All slots
  of other TMs are offered to JM.
  - That is, only one TM may have fewer tasks than the other TMs. The
  difference between the number of tasks of other TMs is at most 1.So When
  *p* >> *slotsPerTM*, the problem can be ignored.
  - We can also suggest users, in cases that p is small, it's better to
  configure *slotsPerTM* to 1, or let *p % slotsPerTM* == 0.

Please correct me if my understanding is wrong, thanks~

Best,
Rui

On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo  wrote:

> Hi, Rui,
>
> 1. With the current mechanism, when physical slots are offered from
> TM, the JobMaster will start deploying tasks and synchronizing their
> states. With the addition of the waiting mechanism, IIUC, the
> JobMaster will deploy and synchronize the states of all tasks only
> after all resources are available. The task deployment and state
> synchronization both occupy the JobMaster's RPC main thread. In
> complex jobs with a lot of tasks, this waiting mechanism may increase
> the pressure on the JobMaster and increase the end-to-end job
> deployment time.
>
> 2. From my understanding, if user enable the
> cluster.evenly-spread-out-slots,
> LeastUtilizationResourceMatchingStrategy will be used to determine the
> slot distribution and the slot allocation in the three TM will be
> (taskmanager.numberOfTaskSlots=3):
> TM1: 3 slot
> TM2: 2 slot
> TM3: 2 slot
>
> Best,
> Yangze Guo
>
> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Shammon,
> >
> > Thanks for your feedback as well!
> >
> > > IIUC, the overall balance is divided into two parts: slot to TM and
> task
> > to slot.
> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> > > 2. Task to slot is guaranteed by the slot pool in JM
> > >
> > > These two are completely independent, what are the benefits of unifying
> > > these two into one option? Also, do we want to share the same
> > > option between SlotPool in JM and SlotManager in RM? This sounds a bit
> > > strange.
> >
> > Your understanding is totally right, the balance needs 2 parts: slot to
> TM
> > and task to slot.
> >
> > As I understand, the following are benefits of unifying them into one
> > option:
> >
> > - Flink users don't care about these principles inside of flink, they
> don't
> > know these 2 parts.
> > - If flink provides 2 options, flink users need to set 2 options for
> their
> > job.
> > - If one option is missed, the final result may not be good. (Users may
> > have questions when using)
> > - If flink just provides 1 option, enabling one option is enough. (Reduce
> > the probability of misconfiguration)
> >
> > Also, Flink’s options are user-oriented. Each option represents a switch
> or
> > parameter of a feature.
> > A feature may be composed of multiple components inside Flink.
> > It might be better to keep only one switch per feature.
> >
> > Actually, the cluster.evenly-spread-out-slots option is used between
> > SlotPool in JM and SlotManager in RM. 2 components to ensure
> > this feature works well.
> >
> > Please correct me if my understanding is wrong,
> > and looking forward to your feedback, thanks!
> >
> > Best,
> > Rui
> >
> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Hi Yangze,
> > >
> > > Thanks for your feedback!
> > >
> > > > 1. Is it possible for the SlotPool to get the slot allocation results
> > > > from the SlotManager in advance instead of waiting for the actual
> > > > physical slots to be registered, and perform pre-allocation? The
> > > > benefit of doing this is to make the task deployment process
> smoother,
> > > > especially when there are a large number of tasks in the job.
> > >
> > > Could you elaborate on that? I didn't understand what's the benefit and
> > > smoother.
> > >
> > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in
> > > > example 2 of section 2.2.3 can be resolved. Do I understand it
> > > > correctly?
> > >
> > > The 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-07 Thread Rui Fan
Hi Shammon,

IIUC, you want more flexibility in controlling the two-phase strategy,
right?

> I want this because we would like to add a new slot to TM strategy such
as SLOTS_NUM in the future for OLAP to improve the performance for olap
jobs, which will use TASKS strategy for task to slot. cc Guoyangze

Actually, one option can achieve your requirement, it can control two-phase.
We can add a new enum for this option, and it will use the new strategy for
slot to TM, and use task_balanced strategy for task to slot.

Of course, I think 2 options is more flexible. If the strategy is too many,
2 options are easy for users.

Also, I have a question: What is SLOTS_NUM strategy? Isn't it slot balanced
at tm level?
I want to check whether it's similar to `cluster.evenly-spread-out-slots`.
If they are similar or same, the strategy isn't too many, and one option
may be enough.

Best,
Rui

On Sat, Oct 7, 2023 at 11:29 AM Shammon FY  wrote:

> Thanks Rui, I check the codes and you're right.
>
> As you described above, the entire process is actually two independent
> steps from slot to TM and task to slot. Currenlty we use option
> `cluster.evenly-spread-out-slots` for both of them. Can we provide
> different options for the two steps, such as ANY/SLOTS for RM and ANY/TASKS
> for slot pool?
>
> I want this because we would like to add a new slot to TM strategy such as
> SLOTS_NUM in the future for OLAP to improve the performance for olap jobs,
> which will use TASKS strategy for task to slot. cc Guoyangze
>
> Best,
> Shammon FY
>
> On Fri, Oct 6, 2023 at 6:19 PM xiangyu feng  wrote:
>
>> Thanks Yuepeng and Rui for driving this Discussion.
>>
>> Internally when we try to use Flink 1.17.1 in production, we are also
>> suffering from the unbalanced task distribution problem for jobs with high
>> qps and complex dag. So +1 for the overall proposal.
>>
>> Some questions about the details:
>>
>> 1, About the waiting mechanism: Will the waiting mechanism happen only in
>> the second level 'assigning slots to TM'?  IIUC, the first level
>> 'assigning
>> Tasks to Slots' needs only the asynchronous slot result from slotpool.
>>
>> 2, About the slot LoadingWeight: it is reasonable to use the number of
>> tasks by default in the beginning, but it would be better if this could be
>> easily extended in future to distinguish between CPU-intensive and
>> IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
>> others have CPU bottlenecks.
>>
>> Regards,
>> Xiangyu
>>
>>
>> Yuepeng Pan  于2023年10月5日周四 18:34写道:
>>
>> > Hi, Zhu Zhu,
>> >
>> > Thanks for your feedback!
>> >
>> > > I think we can introduce a new config option
>> > > `taskmanager.load-balance.mode`,
>> > > which accepts "None"/"Slots"/"Tasks".
>> `cluster.evenly-spread-out-slots`
>> > > can be superseded by the "Slots" mode and get deprecated. In the
>> future
>> > > it can support more mode, e.g. "CpuCores", to work better for jobs
>> with
>> > > fine-grained resources. The proposed config option
>> > > `slot.request.max-interval`
>> > > then can be renamed to
>> > `taskmanager.load-balance.request-stablizing-timeout`
>> > > to show its relation with the feature. The proposed
>> > `slot.sharing-strategy`
>> > > is not needed, because the configured "Tasks" mode will do the work.
>> >
>> > The new proposed configuration option sounds good to me.
>> >
>> > I have a small question, If we set our configuration value to 'Tasks,'
>> it
>> > will initiate two processes: balancing the allocation of task
>> quantities at
>> > the slot level and balancing the number of tasks across TaskManagers
>> (TMs).
>> > Alternatively, if we configure it as 'Slots,' the system will employ the
>> > LocalPreferred allocation policy (which is the default) when assigning
>> > tasks to slots, and it will ensure that the number of slots used across
>> TMs
>> > is balanced.
>> > Does  this configuration essentially combine a balanced selection
>> strategy
>> > across two dimensions into fixed configuration items, right?
>> >
>> > I would appreciate it if you could correct me if I've made any errors.
>> >
>> > Best,
>> > Yuepeng.
>> >
>>
>


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-06 Thread Shammon FY
Thanks Rui, I check the codes and you're right.

As you described above, the entire process is actually two independent
steps from slot to TM and task to slot. Currenlty we use option
`cluster.evenly-spread-out-slots` for both of them. Can we provide
different options for the two steps, such as ANY/SLOTS for RM and ANY/TASKS
for slot pool?

I want this because we would like to add a new slot to TM strategy such as
SLOTS_NUM in the future for OLAP to improve the performance for olap jobs,
which will use TASKS strategy for task to slot. cc Guoyangze

Best,
Shammon FY

On Fri, Oct 6, 2023 at 6:19 PM xiangyu feng  wrote:

> Thanks Yuepeng and Rui for driving this Discussion.
>
> Internally when we try to use Flink 1.17.1 in production, we are also
> suffering from the unbalanced task distribution problem for jobs with high
> qps and complex dag. So +1 for the overall proposal.
>
> Some questions about the details:
>
> 1, About the waiting mechanism: Will the waiting mechanism happen only in
> the second level 'assigning slots to TM'?  IIUC, the first level 'assigning
> Tasks to Slots' needs only the asynchronous slot result from slotpool.
>
> 2, About the slot LoadingWeight: it is reasonable to use the number of
> tasks by default in the beginning, but it would be better if this could be
> easily extended in future to distinguish between CPU-intensive and
> IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
> others have CPU bottlenecks.
>
> Regards,
> Xiangyu
>
>
> Yuepeng Pan  于2023年10月5日周四 18:34写道:
>
> > Hi, Zhu Zhu,
> >
> > Thanks for your feedback!
> >
> > > I think we can introduce a new config option
> > > `taskmanager.load-balance.mode`,
> > > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> > > can be superseded by the "Slots" mode and get deprecated. In the future
> > > it can support more mode, e.g. "CpuCores", to work better for jobs with
> > > fine-grained resources. The proposed config option
> > > `slot.request.max-interval`
> > > then can be renamed to
> > `taskmanager.load-balance.request-stablizing-timeout`
> > > to show its relation with the feature. The proposed
> > `slot.sharing-strategy`
> > > is not needed, because the configured "Tasks" mode will do the work.
> >
> > The new proposed configuration option sounds good to me.
> >
> > I have a small question, If we set our configuration value to 'Tasks,' it
> > will initiate two processes: balancing the allocation of task quantities
> at
> > the slot level and balancing the number of tasks across TaskManagers
> (TMs).
> > Alternatively, if we configure it as 'Slots,' the system will employ the
> > LocalPreferred allocation policy (which is the default) when assigning
> > tasks to slots, and it will ensure that the number of slots used across
> TMs
> > is balanced.
> > Does  this configuration essentially combine a balanced selection
> strategy
> > across two dimensions into fixed configuration items, right?
> >
> > I would appreciate it if you could correct me if I've made any errors.
> >
> > Best,
> > Yuepeng.
> >
>


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-06 Thread xiangyu feng
Thanks Yuepeng and Rui for driving this Discussion.

Internally when we try to use Flink 1.17.1 in production, we are also
suffering from the unbalanced task distribution problem for jobs with high
qps and complex dag. So +1 for the overall proposal.

Some questions about the details:

1, About the waiting mechanism: Will the waiting mechanism happen only in
the second level 'assigning slots to TM'?  IIUC, the first level 'assigning
Tasks to Slots' needs only the asynchronous slot result from slotpool.

2, About the slot LoadingWeight: it is reasonable to use the number of
tasks by default in the beginning, but it would be better if this could be
easily extended in future to distinguish between CPU-intensive and
IO-intensive workloads. In some cases, TMs may have IO bottlenecks but
others have CPU bottlenecks.

Regards,
Xiangyu


Yuepeng Pan  于2023年10月5日周四 18:34写道:

> Hi, Zhu Zhu,
>
> Thanks for your feedback!
>
> > I think we can introduce a new config option
> > `taskmanager.load-balance.mode`,
> > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> > can be superseded by the "Slots" mode and get deprecated. In the future
> > it can support more mode, e.g. "CpuCores", to work better for jobs with
> > fine-grained resources. The proposed config option
> > `slot.request.max-interval`
> > then can be renamed to
> `taskmanager.load-balance.request-stablizing-timeout`
> > to show its relation with the feature. The proposed
> `slot.sharing-strategy`
> > is not needed, because the configured "Tasks" mode will do the work.
>
> The new proposed configuration option sounds good to me.
>
> I have a small question, If we set our configuration value to 'Tasks,' it
> will initiate two processes: balancing the allocation of task quantities at
> the slot level and balancing the number of tasks across TaskManagers (TMs).
> Alternatively, if we configure it as 'Slots,' the system will employ the
> LocalPreferred allocation policy (which is the default) when assigning
> tasks to slots, and it will ensure that the number of slots used across TMs
> is balanced.
> Does  this configuration essentially combine a balanced selection strategy
> across two dimensions into fixed configuration items, right?
>
> I would appreciate it if you could correct me if I've made any errors.
>
> Best,
> Yuepeng.
>


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-05 Thread Yuepeng Pan
Hi, Zhu Zhu,

Thanks for your feedback!

> I think we can introduce a new config option
> `taskmanager.load-balance.mode`,
> which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> can be superseded by the "Slots" mode and get deprecated. In the future
> it can support more mode, e.g. "CpuCores", to work better for jobs with
> fine-grained resources. The proposed config option
> `slot.request.max-interval`
> then can be renamed to `taskmanager.load-balance.request-stablizing-timeout`
> to show its relation with the feature. The proposed `slot.sharing-strategy`
> is not needed, because the configured "Tasks" mode will do the work.

The new proposed configuration option sounds good to me. 

I have a small question, If we set our configuration value to 'Tasks,' it will 
initiate two processes: balancing the allocation of task quantities at the slot 
level and balancing the number of tasks across TaskManagers (TMs).
Alternatively, if we configure it as 'Slots,' the system will employ the 
LocalPreferred allocation policy (which is the default) when assigning tasks to 
slots, and it will ensure that the number of slots used across TMs is balanced.
Does  this configuration essentially combine a balanced selection strategy 
across two dimensions into fixed configuration items, right?

I would appreciate it if you could correct me if I've made any errors.

Best,
Yuepeng.


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-02 Thread David Morávek
Hello Yuepeng,

The FLIP reads sane; nice work! To re-phrase my understanding:

The problem you're trying to solve only exists in complex graphs with
different per-vertex parallelism. If the parallelism is set globally
(assuming the pipeline has roughly even data skew), the algorithm could
make things slightly worse by eliminating some local exchanges. Is that
correct?

Where I'm headed with this is that there could be a hybrid strategy that
provides a reasonable default when the pipeline uses slot-sharing (for
per-vertex parallelism, use the new strategy; for global parallelism use
the old one). It's always a shame if improvements like this end up being a
power-user feature and very few workloads benefit from it. Any thoughts?

Best,
D.

On Sun, Oct 1, 2023 at 1:38 PM Yangze Guo  wrote:

> Hi, Rui,
>
> 1. With the current mechanism, when physical slots are offered from
> TM, the JobMaster will start deploying tasks and synchronizing their
> states. With the addition of the waiting mechanism, IIUC, the
> JobMaster will deploy and synchronize the states of all tasks only
> after all resources are available. The task deployment and state
> synchronization both occupy the JobMaster's RPC main thread. In
> complex jobs with a lot of tasks, this waiting mechanism may increase
> the pressure on the JobMaster and increase the end-to-end job
> deployment time.
>
> 2. From my understanding, if user enable the
> cluster.evenly-spread-out-slots,
> LeastUtilizationResourceMatchingStrategy will be used to determine the
> slot distribution and the slot allocation in the three TM will be
> (taskmanager.numberOfTaskSlots=3):
> TM1: 3 slot
> TM2: 2 slot
> TM3: 2 slot
>
> Best,
> Yangze Guo
>
> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Shammon,
> >
> > Thanks for your feedback as well!
> >
> > > IIUC, the overall balance is divided into two parts: slot to TM and
> task
> > to slot.
> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> > > 2. Task to slot is guaranteed by the slot pool in JM
> > >
> > > These two are completely independent, what are the benefits of unifying
> > > these two into one option? Also, do we want to share the same
> > > option between SlotPool in JM and SlotManager in RM? This sounds a bit
> > > strange.
> >
> > Your understanding is totally right, the balance needs 2 parts: slot to
> TM
> > and task to slot.
> >
> > As I understand, the following are benefits of unifying them into one
> > option:
> >
> > - Flink users don't care about these principles inside of flink, they
> don't
> > know these 2 parts.
> > - If flink provides 2 options, flink users need to set 2 options for
> their
> > job.
> > - If one option is missed, the final result may not be good. (Users may
> > have questions when using)
> > - If flink just provides 1 option, enabling one option is enough. (Reduce
> > the probability of misconfiguration)
> >
> > Also, Flink’s options are user-oriented. Each option represents a switch
> or
> > parameter of a feature.
> > A feature may be composed of multiple components inside Flink.
> > It might be better to keep only one switch per feature.
> >
> > Actually, the cluster.evenly-spread-out-slots option is used between
> > SlotPool in JM and SlotManager in RM. 2 components to ensure
> > this feature works well.
> >
> > Please correct me if my understanding is wrong,
> > and looking forward to your feedback, thanks!
> >
> > Best,
> > Rui
> >
> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Hi Yangze,
> > >
> > > Thanks for your feedback!
> > >
> > > > 1. Is it possible for the SlotPool to get the slot allocation results
> > > > from the SlotManager in advance instead of waiting for the actual
> > > > physical slots to be registered, and perform pre-allocation? The
> > > > benefit of doing this is to make the task deployment process
> smoother,
> > > > especially when there are a large number of tasks in the job.
> > >
> > > Could you elaborate on that? I didn't understand what's the benefit and
> > > smoother.
> > >
> > > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in
> > > > example 2 of section 2.2.3 can be resolved. Do I understand it
> > > > correctly?
> > >
> > > The example assigned result is the final allocation result when flink
> > > user enables the cluster.evenly-spread-out-slots. We think the
> > > assigned result is expected, so I think your understanding is right.
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY  wrote:
> > >
> > >> Thanks Yuepeng for initiating this discussion.
> > >>
> > >> +1 in general too, in fact we have implemented a similar mechanism
> > >> internally to ensure a balanced allocation of tasks to slots, it works
> > >> well.
> > >>
> > >> Some comments about the mechanism
> > >>
> > >> 1. This mechanism will be only supported in `SlotPool` or both
> `SlotPool`
> > >> and `DeclarativeSlotPool`? Currently the two 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-01 Thread Yangze Guo
Hi, Rui,

1. With the current mechanism, when physical slots are offered from
TM, the JobMaster will start deploying tasks and synchronizing their
states. With the addition of the waiting mechanism, IIUC, the
JobMaster will deploy and synchronize the states of all tasks only
after all resources are available. The task deployment and state
synchronization both occupy the JobMaster's RPC main thread. In
complex jobs with a lot of tasks, this waiting mechanism may increase
the pressure on the JobMaster and increase the end-to-end job
deployment time.

2. From my understanding, if user enable the
cluster.evenly-spread-out-slots,
LeastUtilizationResourceMatchingStrategy will be used to determine the
slot distribution and the slot allocation in the three TM will be
(taskmanager.numberOfTaskSlots=3):
TM1: 3 slot
TM2: 2 slot
TM3: 2 slot

Best,
Yangze Guo

On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi Shammon,
>
> Thanks for your feedback as well!
>
> > IIUC, the overall balance is divided into two parts: slot to TM and task
> to slot.
> > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> > 2. Task to slot is guaranteed by the slot pool in JM
> >
> > These two are completely independent, what are the benefits of unifying
> > these two into one option? Also, do we want to share the same
> > option between SlotPool in JM and SlotManager in RM? This sounds a bit
> > strange.
>
> Your understanding is totally right, the balance needs 2 parts: slot to TM
> and task to slot.
>
> As I understand, the following are benefits of unifying them into one
> option:
>
> - Flink users don't care about these principles inside of flink, they don't
> know these 2 parts.
> - If flink provides 2 options, flink users need to set 2 options for their
> job.
> - If one option is missed, the final result may not be good. (Users may
> have questions when using)
> - If flink just provides 1 option, enabling one option is enough. (Reduce
> the probability of misconfiguration)
>
> Also, Flink’s options are user-oriented. Each option represents a switch or
> parameter of a feature.
> A feature may be composed of multiple components inside Flink.
> It might be better to keep only one switch per feature.
>
> Actually, the cluster.evenly-spread-out-slots option is used between
> SlotPool in JM and SlotManager in RM. 2 components to ensure
> this feature works well.
>
> Please correct me if my understanding is wrong,
> and looking forward to your feedback, thanks!
>
> Best,
> Rui
>
> On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi Yangze,
> >
> > Thanks for your feedback!
> >
> > > 1. Is it possible for the SlotPool to get the slot allocation results
> > > from the SlotManager in advance instead of waiting for the actual
> > > physical slots to be registered, and perform pre-allocation? The
> > > benefit of doing this is to make the task deployment process smoother,
> > > especially when there are a large number of tasks in the job.
> >
> > Could you elaborate on that? I didn't understand what's the benefit and
> > smoother.
> >
> > > 2. If user enable the cluster.evenly-spread-out-slots, the issue in
> > > example 2 of section 2.2.3 can be resolved. Do I understand it
> > > correctly?
> >
> > The example assigned result is the final allocation result when flink
> > user enables the cluster.evenly-spread-out-slots. We think the
> > assigned result is expected, so I think your understanding is right.
> >
> > Best,
> > Rui
> >
> > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY  wrote:
> >
> >> Thanks Yuepeng for initiating this discussion.
> >>
> >> +1 in general too, in fact we have implemented a similar mechanism
> >> internally to ensure a balanced allocation of tasks to slots, it works
> >> well.
> >>
> >> Some comments about the mechanism
> >>
> >> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> >> and `DeclarativeSlotPool`? Currently the two slot pools are used in
> >> different schedulers. I think this will also bring value to
> >> `DeclarativeSlotPool`, but currently FLIP content seems to be based on
> >> `SlotPool`, right?
> >>
> >> 2. In fine-grained resource management, we can set different resource
> >> requirements for different nodes, which means that the resources of each
> >> slot are different. What should be done when the slot selected by the
> >> round-robin strategy cannot meet the resource requirements? Will this lead
> >> to the failure of the balance strategy?
> >>
> >> 3. Is the assignment of tasks to slots balanced based on region or job
> >> level? When multiple TMs fail over, will it cause the balancing strategy
> >> to
> >> fail or even worse? What is the current processing strategy?
> >>
> >> For Zhuzhu and Rui:
> >>
> >> IIUC, the overall balance is divided into two parts: slot to TM and task
> >> to
> >> slot.
> >> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> >> 2. Task to slot is guaranteed by the slot 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-01 Thread Rui Fan
Hi Shammon,

Thanks for your feedback as well!

> IIUC, the overall balance is divided into two parts: slot to TM and task
to slot.
> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> 2. Task to slot is guaranteed by the slot pool in JM
>
> These two are completely independent, what are the benefits of unifying
> these two into one option? Also, do we want to share the same
> option between SlotPool in JM and SlotManager in RM? This sounds a bit
> strange.

Your understanding is totally right, the balance needs 2 parts: slot to TM
and task to slot.

As I understand, the following are benefits of unifying them into one
option:

- Flink users don't care about these principles inside of flink, they don't
know these 2 parts.
- If flink provides 2 options, flink users need to set 2 options for their
job.
- If one option is missed, the final result may not be good. (Users may
have questions when using)
- If flink just provides 1 option, enabling one option is enough. (Reduce
the probability of misconfiguration)

Also, Flink’s options are user-oriented. Each option represents a switch or
parameter of a feature.
A feature may be composed of multiple components inside Flink.
It might be better to keep only one switch per feature.

Actually, the cluster.evenly-spread-out-slots option is used between
SlotPool in JM and SlotManager in RM. 2 components to ensure
this feature works well.

Please correct me if my understanding is wrong,
and looking forward to your feedback, thanks!

Best,
Rui

On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi Yangze,
>
> Thanks for your feedback!
>
> > 1. Is it possible for the SlotPool to get the slot allocation results
> > from the SlotManager in advance instead of waiting for the actual
> > physical slots to be registered, and perform pre-allocation? The
> > benefit of doing this is to make the task deployment process smoother,
> > especially when there are a large number of tasks in the job.
>
> Could you elaborate on that? I didn't understand what's the benefit and
> smoother.
>
> > 2. If user enable the cluster.evenly-spread-out-slots, the issue in
> > example 2 of section 2.2.3 can be resolved. Do I understand it
> > correctly?
>
> The example assigned result is the final allocation result when flink
> user enables the cluster.evenly-spread-out-slots. We think the
> assigned result is expected, so I think your understanding is right.
>
> Best,
> Rui
>
> On Thu, Sep 28, 2023 at 1:10 PM Shammon FY  wrote:
>
>> Thanks Yuepeng for initiating this discussion.
>>
>> +1 in general too, in fact we have implemented a similar mechanism
>> internally to ensure a balanced allocation of tasks to slots, it works
>> well.
>>
>> Some comments about the mechanism
>>
>> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
>> and `DeclarativeSlotPool`? Currently the two slot pools are used in
>> different schedulers. I think this will also bring value to
>> `DeclarativeSlotPool`, but currently FLIP content seems to be based on
>> `SlotPool`, right?
>>
>> 2. In fine-grained resource management, we can set different resource
>> requirements for different nodes, which means that the resources of each
>> slot are different. What should be done when the slot selected by the
>> round-robin strategy cannot meet the resource requirements? Will this lead
>> to the failure of the balance strategy?
>>
>> 3. Is the assignment of tasks to slots balanced based on region or job
>> level? When multiple TMs fail over, will it cause the balancing strategy
>> to
>> fail or even worse? What is the current processing strategy?
>>
>> For Zhuzhu and Rui:
>>
>> IIUC, the overall balance is divided into two parts: slot to TM and task
>> to
>> slot.
>> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
>> 2. Task to slot is guaranteed by the slot pool in JM
>>
>> These two are completely independent, what are the benefits of unifying
>> these two into one option? Also, do we want to share the same
>> option between SlotPool in JM and SlotManager in RM? This sounds a bit
>> strange.
>>
>> Best,
>> Shammon FY
>>
>>
>>
>> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote:
>>
>> > Hi Zhu Zhu,
>> >
>> > Thanks for your feedback here!
>> >
>> > You are right, user needs to set 2 options:
>> > - cluster.evenly-spread-out-slots=true
>> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED
>> >
>> > Update it to one option is useful at user side, so
>> > `taskmanager.load-balance.mode` sounds good to me.
>> > I want to check some points and behaviors about this option:
>> >
>> > 1. The default value is None, right?
>> > 2. When it's set to Tasks, how to assign slots to TM?
>> > - Option1: It's just check task number
>> > - Option2: It''s check the slot number first, then check the
>> > task number when the slot number is the same.
>> >
>> > Giving an example to explain what's the difference between them:
>> >
>> > - A session cluster has 2 flink 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-10-01 Thread Rui Fan
Hi Yangze,

Thanks for your feedback!

> 1. Is it possible for the SlotPool to get the slot allocation results
> from the SlotManager in advance instead of waiting for the actual
> physical slots to be registered, and perform pre-allocation? The
> benefit of doing this is to make the task deployment process smoother,
> especially when there are a large number of tasks in the job.

Could you elaborate on that? I didn't understand what's the benefit and
smoother.

> 2. If user enable the cluster.evenly-spread-out-slots, the issue in
> example 2 of section 2.2.3 can be resolved. Do I understand it
> correctly?

The example assigned result is the final allocation result when flink
user enables the cluster.evenly-spread-out-slots. We think the
assigned result is expected, so I think your understanding is right.

Best,
Rui

On Thu, Sep 28, 2023 at 1:10 PM Shammon FY  wrote:

> Thanks Yuepeng for initiating this discussion.
>
> +1 in general too, in fact we have implemented a similar mechanism
> internally to ensure a balanced allocation of tasks to slots, it works
> well.
>
> Some comments about the mechanism
>
> 1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
> and `DeclarativeSlotPool`? Currently the two slot pools are used in
> different schedulers. I think this will also bring value to
> `DeclarativeSlotPool`, but currently FLIP content seems to be based on
> `SlotPool`, right?
>
> 2. In fine-grained resource management, we can set different resource
> requirements for different nodes, which means that the resources of each
> slot are different. What should be done when the slot selected by the
> round-robin strategy cannot meet the resource requirements? Will this lead
> to the failure of the balance strategy?
>
> 3. Is the assignment of tasks to slots balanced based on region or job
> level? When multiple TMs fail over, will it cause the balancing strategy to
> fail or even worse? What is the current processing strategy?
>
> For Zhuzhu and Rui:
>
> IIUC, the overall balance is divided into two parts: slot to TM and task to
> slot.
> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> 2. Task to slot is guaranteed by the slot pool in JM
>
> These two are completely independent, what are the benefits of unifying
> these two into one option? Also, do we want to share the same
> option between SlotPool in JM and SlotManager in RM? This sounds a bit
> strange.
>
> Best,
> Shammon FY
>
>
>
> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi Zhu Zhu,
> >
> > Thanks for your feedback here!
> >
> > You are right, user needs to set 2 options:
> > - cluster.evenly-spread-out-slots=true
> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED
> >
> > Update it to one option is useful at user side, so
> > `taskmanager.load-balance.mode` sounds good to me.
> > I want to check some points and behaviors about this option:
> >
> > 1. The default value is None, right?
> > 2. When it's set to Tasks, how to assign slots to TM?
> > - Option1: It's just check task number
> > - Option2: It''s check the slot number first, then check the
> > task number when the slot number is the same.
> >
> > Giving an example to explain what's the difference between them:
> >
> > - A session cluster has 2 flink jobs, they are jobA and jobB
> > - Each TM has 4 slots.
> > - The task number of one slot of jobA is 3
> > - The task number of one slot of jobB is 1
> > - We have 2 TaskManagers:
> >   - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks
> >   - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks.
> >
> > Now, we need to run a new slot, which tm should offer it?
> > - Option1: If we just check the task number, the tm1 is better.
> > - Option2: If we check the slot number first, and then check task, the
> tm2
> > is better
> >
> > The original FLIP selected option2, that's why we didn't add the
> > third option. The option2 didn't break the semantics when
> > `cluster.evenly-spread-out-slots` is true, and it just improve the
> > behavior without the semantics is changed.
> >
> > In the other hands, if we choose option2, when user set
> > `taskmanager.load-balance.mode` is Tasks. It also can achieve
> > the goal when it's Slots.
> >
> > So I think the `Slots` enum isn't needed if we choose option2.
> > Of course, If we choose the option1, the enum is needed.
> >
> > Looking forward to your feedback, thanks~
> >
> > Best,
> > Rui
> >
> > On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu  wrote:
> >
> > > Thanks Yuepeng and Rui for creating this FLIP.
> > >
> > > +1 in general
> > > The idea is straight forward: best-effort gather all the slot requests
> > > and offered slots to form an overview before assigning slots, trying to
> > > balance the loads of task managers when assigning slots.
> > >
> > > I have one comment regarding the configuration for ease of use:
> > >
> > > IIUC, this FLIP uses an existing config
> 'cluster.evenly-spread-out-slots'
> > > as the main switch of 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-09-27 Thread Shammon FY
Thanks Yuepeng for initiating this discussion.

+1 in general too, in fact we have implemented a similar mechanism
internally to ensure a balanced allocation of tasks to slots, it works well.

Some comments about the mechanism

1. This mechanism will be only supported in `SlotPool` or both `SlotPool`
and `DeclarativeSlotPool`? Currently the two slot pools are used in
different schedulers. I think this will also bring value to
`DeclarativeSlotPool`, but currently FLIP content seems to be based on
`SlotPool`, right?

2. In fine-grained resource management, we can set different resource
requirements for different nodes, which means that the resources of each
slot are different. What should be done when the slot selected by the
round-robin strategy cannot meet the resource requirements? Will this lead
to the failure of the balance strategy?

3. Is the assignment of tasks to slots balanced based on region or job
level? When multiple TMs fail over, will it cause the balancing strategy to
fail or even worse? What is the current processing strategy?

For Zhuzhu and Rui:

IIUC, the overall balance is divided into two parts: slot to TM and task to
slot.
1. Slot to TM is guaranteed by SlotManager in ResourceManager
2. Task to slot is guaranteed by the slot pool in JM

These two are completely independent, what are the benefits of unifying
these two into one option? Also, do we want to share the same
option between SlotPool in JM and SlotManager in RM? This sounds a bit
strange.

Best,
Shammon FY



On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi Zhu Zhu,
>
> Thanks for your feedback here!
>
> You are right, user needs to set 2 options:
> - cluster.evenly-spread-out-slots=true
> - slot.sharing-strategy=TASK_BALANCED_PREFERRED
>
> Update it to one option is useful at user side, so
> `taskmanager.load-balance.mode` sounds good to me.
> I want to check some points and behaviors about this option:
>
> 1. The default value is None, right?
> 2. When it's set to Tasks, how to assign slots to TM?
> - Option1: It's just check task number
> - Option2: It''s check the slot number first, then check the
> task number when the slot number is the same.
>
> Giving an example to explain what's the difference between them:
>
> - A session cluster has 2 flink jobs, they are jobA and jobB
> - Each TM has 4 slots.
> - The task number of one slot of jobA is 3
> - The task number of one slot of jobB is 1
> - We have 2 TaskManagers:
>   - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks
>   - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks.
>
> Now, we need to run a new slot, which tm should offer it?
> - Option1: If we just check the task number, the tm1 is better.
> - Option2: If we check the slot number first, and then check task, the tm2
> is better
>
> The original FLIP selected option2, that's why we didn't add the
> third option. The option2 didn't break the semantics when
> `cluster.evenly-spread-out-slots` is true, and it just improve the
> behavior without the semantics is changed.
>
> In the other hands, if we choose option2, when user set
> `taskmanager.load-balance.mode` is Tasks. It also can achieve
> the goal when it's Slots.
>
> So I think the `Slots` enum isn't needed if we choose option2.
> Of course, If we choose the option1, the enum is needed.
>
> Looking forward to your feedback, thanks~
>
> Best,
> Rui
>
> On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu  wrote:
>
> > Thanks Yuepeng and Rui for creating this FLIP.
> >
> > +1 in general
> > The idea is straight forward: best-effort gather all the slot requests
> > and offered slots to form an overview before assigning slots, trying to
> > balance the loads of task managers when assigning slots.
> >
> > I have one comment regarding the configuration for ease of use:
> >
> > IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots'
> > as the main switch of the new feature. That is, from user perspective,
> > with this improvement, the 'cluster.evenly-spread-out-slots' feature not
> > only balances the number of slots on task managers, but also balances the
> > number of tasks. This is a behavior change anyway. Besides that, it also
> > requires users to set 'slot.sharing-strategy' to
> 'TASK_BALANCED_PREFERRED'
> > to balance the tasks in each slot.
> >
> > I think we can introduce a new config option
> > `taskmanager.load-balance.mode`,
> > which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> > can be superseded by the "Slots" mode and get deprecated. In the future
> > it can support more mode, e.g. "CpuCores", to work better for jobs with
> > fine-grained resources. The proposed config option
> > `slot.request.max-interval`
> > then can be renamed to
> > `taskmanager.load-balance.request-stablizing-timeout`
> > to show its relation with the feature. The proposed
> `slot.sharing-strategy`
> > is not needed, because the configured "Tasks" mode will do the work.
> >
> > WDYT?
> >
> > Thanks,
> > Zhu 

Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-09-27 Thread Rui Fan
Hi Zhu Zhu,

Thanks for your feedback here!

You are right, user needs to set 2 options:
- cluster.evenly-spread-out-slots=true
- slot.sharing-strategy=TASK_BALANCED_PREFERRED

Update it to one option is useful at user side, so
`taskmanager.load-balance.mode` sounds good to me.
I want to check some points and behaviors about this option:

1. The default value is None, right?
2. When it's set to Tasks, how to assign slots to TM?
- Option1: It's just check task number
- Option2: It''s check the slot number first, then check the
task number when the slot number is the same.

Giving an example to explain what's the difference between them:

- A session cluster has 2 flink jobs, they are jobA and jobB
- Each TM has 4 slots.
- The task number of one slot of jobA is 3
- The task number of one slot of jobB is 1
- We have 2 TaskManagers:
  - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks
  - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs 4 tasks.

Now, we need to run a new slot, which tm should offer it?
- Option1: If we just check the task number, the tm1 is better.
- Option2: If we check the slot number first, and then check task, the tm2
is better

The original FLIP selected option2, that's why we didn't add the
third option. The option2 didn't break the semantics when
`cluster.evenly-spread-out-slots` is true, and it just improve the
behavior without the semantics is changed.

In the other hands, if we choose option2, when user set
`taskmanager.load-balance.mode` is Tasks. It also can achieve
the goal when it's Slots.

So I think the `Slots` enum isn't needed if we choose option2.
Of course, If we choose the option1, the enum is needed.

Looking forward to your feedback, thanks~

Best,
Rui

On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu  wrote:

> Thanks Yuepeng and Rui for creating this FLIP.
>
> +1 in general
> The idea is straight forward: best-effort gather all the slot requests
> and offered slots to form an overview before assigning slots, trying to
> balance the loads of task managers when assigning slots.
>
> I have one comment regarding the configuration for ease of use:
>
> IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots'
> as the main switch of the new feature. That is, from user perspective,
> with this improvement, the 'cluster.evenly-spread-out-slots' feature not
> only balances the number of slots on task managers, but also balances the
> number of tasks. This is a behavior change anyway. Besides that, it also
> requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED'
> to balance the tasks in each slot.
>
> I think we can introduce a new config option
> `taskmanager.load-balance.mode`,
> which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> can be superseded by the "Slots" mode and get deprecated. In the future
> it can support more mode, e.g. "CpuCores", to work better for jobs with
> fine-grained resources. The proposed config option
> `slot.request.max-interval`
> then can be renamed to
> `taskmanager.load-balance.request-stablizing-timeout`
> to show its relation with the feature. The proposed `slot.sharing-strategy`
> is not needed, because the configured "Tasks" mode will do the work.
>
> WDYT?
>
> Thanks,
> Zhu Zhu
>
> Yuepeng Pan  于2023年9月25日周一 16:26写道:
>
>> Hi all,
>>
>>
>> I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks
>> scheduling.
>>
>>
>> The current strategy of Flink to deploy tasks sometimes leads some
>> TMs(TaskManagers) to have more tasks while others have fewer tasks,
>> resulting in excessive resource utilization at some TMs that contain more
>> tasks and becoming a bottleneck for the entire job processing. Developing
>> strategies to achieve task load balancing for TMs and reducing job
>> bottlenecks becomes very meaningful.
>>
>>
>> The raw design and discussions could be found in the Flink JIRA[2] and
>> Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some
>> valuable help and suggestions in advance.
>>
>>
>> Please refer to the FLIP[1] document for more details about the proposed
>> design and implementation. We welcome any feedback and opinions on this
>> proposal.
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
>>
>> [2] https://issues.apache.org/jira/browse/FLINK-31757
>>
>> [3]
>> https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8
>>
>>
>> Best,
>>
>> Yuepeng Pan
>>
>


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-09-27 Thread Yangze Guo
Thanks for driving this FLIP, Yuepeng Pan. +1 for the overall proposal
to support balanced scheduling.

Some questions on the Waiting mechanism and Allocation strategy for slot to TM:

1. Is it possible for the SlotPool to get the slot allocation results
from the SlotManager in advance instead of waiting for the actual
physical slots to be registered, and perform pre-allocation? The
benefit of doing this is to make the task deployment process smoother,
especially when there are a large number of tasks in the job.

2. If user enable the cluster.evenly-spread-out-slots, the issue in
example 2 of section 2.2.3 can be resolved. Do I understand it
correctly?




Best,
Yangze Guo

On Wed, Sep 27, 2023 at 9:12 PM Zhu Zhu  wrote:
>
> Thanks Yuepeng and Rui for creating this FLIP.
>
> +1 in general
> The idea is straight forward: best-effort gather all the slot requests
> and offered slots to form an overview before assigning slots, trying to
> balance the loads of task managers when assigning slots.
>
> I have one comment regarding the configuration for ease of use:
>
> IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots'
> as the main switch of the new feature. That is, from user perspective,
> with this improvement, the 'cluster.evenly-spread-out-slots' feature not
> only balances the number of slots on task managers, but also balances the
> number of tasks. This is a behavior change anyway. Besides that, it also
> requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED'
> to balance the tasks in each slot.
>
> I think we can introduce a new config option
> `taskmanager.load-balance.mode`,
> which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
> can be superseded by the "Slots" mode and get deprecated. In the future
> it can support more mode, e.g. "CpuCores", to work better for jobs with
> fine-grained resources. The proposed config option
> `slot.request.max-interval`
> then can be renamed to `taskmanager.load-balance.request-stablizing-timeout`
> to show its relation with the feature. The proposed `slot.sharing-strategy`
> is not needed, because the configured "Tasks" mode will do the work.
>
> WDYT?
>
> Thanks,
> Zhu Zhu
>
> Yuepeng Pan  于2023年9月25日周一 16:26写道:
>
> > Hi all,
> >
> >
> > I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks
> > scheduling.
> >
> >
> > The current strategy of Flink to deploy tasks sometimes leads some
> > TMs(TaskManagers) to have more tasks while others have fewer tasks,
> > resulting in excessive resource utilization at some TMs that contain more
> > tasks and becoming a bottleneck for the entire job processing. Developing
> > strategies to achieve task load balancing for TMs and reducing job
> > bottlenecks becomes very meaningful.
> >
> >
> > The raw design and discussions could be found in the Flink JIRA[2] and
> > Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some
> > valuable help and suggestions in advance.
> >
> >
> > Please refer to the FLIP[1] document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on this
> > proposal.
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
> >
> > [2] https://issues.apache.org/jira/browse/FLINK-31757
> >
> > [3]
> > https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8
> >
> >
> > Best,
> >
> > Yuepeng Pan
> >


Re: [DISCUSS] FLIP-370 : Support Balanced Tasks Scheduling

2023-09-27 Thread Zhu Zhu
Thanks Yuepeng and Rui for creating this FLIP.

+1 in general
The idea is straight forward: best-effort gather all the slot requests
and offered slots to form an overview before assigning slots, trying to
balance the loads of task managers when assigning slots.

I have one comment regarding the configuration for ease of use:

IIUC, this FLIP uses an existing config 'cluster.evenly-spread-out-slots'
as the main switch of the new feature. That is, from user perspective,
with this improvement, the 'cluster.evenly-spread-out-slots' feature not
only balances the number of slots on task managers, but also balances the
number of tasks. This is a behavior change anyway. Besides that, it also
requires users to set 'slot.sharing-strategy' to 'TASK_BALANCED_PREFERRED'
to balance the tasks in each slot.

I think we can introduce a new config option
`taskmanager.load-balance.mode`,
which accepts "None"/"Slots"/"Tasks". `cluster.evenly-spread-out-slots`
can be superseded by the "Slots" mode and get deprecated. In the future
it can support more mode, e.g. "CpuCores", to work better for jobs with
fine-grained resources. The proposed config option
`slot.request.max-interval`
then can be renamed to `taskmanager.load-balance.request-stablizing-timeout`
to show its relation with the feature. The proposed `slot.sharing-strategy`
is not needed, because the configured "Tasks" mode will do the work.

WDYT?

Thanks,
Zhu Zhu

Yuepeng Pan  于2023年9月25日周一 16:26写道:

> Hi all,
>
>
> I and Fan Rui(CC’ed) created the FLIP-370[1] to support balanced tasks
> scheduling.
>
>
> The current strategy of Flink to deploy tasks sometimes leads some
> TMs(TaskManagers) to have more tasks while others have fewer tasks,
> resulting in excessive resource utilization at some TMs that contain more
> tasks and becoming a bottleneck for the entire job processing. Developing
> strategies to achieve task load balancing for TMs and reducing job
> bottlenecks becomes very meaningful.
>
>
> The raw design and discussions could be found in the Flink JIRA[2] and
> Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for providing some
> valuable help and suggestions in advance.
>
>
> Please refer to the FLIP[1] document for more details about the proposed
> design and implementation. We welcome any feedback and opinions on this
> proposal.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
>
> [2] https://issues.apache.org/jira/browse/FLINK-31757
>
> [3]
> https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8
>
>
> Best,
>
> Yuepeng Pan
>