[ 
https://issues.apache.org/jira/browse/FLINK-40057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-40057:
-----------------------------------
    Labels: pull-request-available  (was: )

> DefaultSlotAssigner over-picks slots from the boundary TaskManager when 
> minimal-TaskManager placement is enabled
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40057
>                 URL: https://issues.apache.org/jira/browse/FLINK-40057
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: ericzeng
>            Priority: Major
>              Labels: pull-request-available
>
> When the cluster runs in application mode and
> {\{JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}} is 
> enabled,
> {\{DefaultSlotAssigner#pickSlotsInMinimalTaskExecutors}} selects free slots 
> from
> TaskManagers in descending order of their free-slot count, so that as few
> TaskManagers as possible are kept busy and the rest can be released.
> The current implementation walks the sorted TaskManagers and adds *all* free
> slots of each one until the requested slot count is reached:
> {code:java}
> final List<PhysicalSlot> pickedSlots = new ArrayList<>();
> final Iterator<ResourceID> sortedTaskExecutors = 
> getSortedTaskExecutors(slotsByTaskExecutor);
> while (pickedSlots.size() < requestedGroups) {
> Set<PhysicalSlot> slotInfos = 
> slotsByTaskExecutor.get(sortedTaskExecutors.next());
> pickedSlots.addAll(slotInfos);
> }
> return pickedSlots;
> {code}
> Because the last (boundary) TaskManager is added in full via \{{addAll}}, the
> returned collection can contain *more* slots than \{{requestedGroups}} 
> whenever
> the boundary TaskManager has more free slots than are still needed. For 
> example,
> with \{{requestedGroups = 5}} and two TaskManagers offering 4 and 3 free 
> slots,
> the method returns 7 slots instead of 5.
> This does not corrupt the final assignment — the downstream
> {\{SimpleSlotMatchingResolver#matchSlotSharingGroupWithSlots}} only consumes 
> the
> first \{{requestedGroups}} slots through an iterator, so the surplus slots are
> silently ignored. However, the returned "minimal" slot set no longer reflects
> the minimal footprint it is supposed to represent: it pulls in extra slots 
> from
> the boundary TaskManager that were never required, which works against the 
> very
> goal of the method (minimizing the involved TaskManagers and the residual
> fragmentation on the boundary TaskManager).
> In addition, the \{{while}} loop has no \{{hasNext()}} guard. It relies 
> entirely
> on the caller's pre-check (\{{freeSlots.size() > 
> requestExecutionSlotSharingGroups}})
> to guarantee termination; the method itself would throw
> {\{NoSuchElementException}} if it were ever called without enough free slots.
> ## Proposed change
> Flatten the slots of the sorted TaskManagers into a single stream and keep 
> only
> the first \{{requestedGroups}} of them with \{{Stream#limit}}:
> {code:java}
> return getSortedTaskExecutors(slotsByTaskExecutor)
> .flatMap(taskExecutor -> slotsByTaskExecutor.get(taskExecutor).stream())
> .limit(requestedGroups)
> .collect(Collectors.toCollection(ArrayList::new));
> {code}
> Because \{{limit}} is lazy and short-circuiting, TaskManagers beyond the 
> boundary
> one are never materialized, and the boundary TaskManager contributes exactly 
> the
> number of slots still needed — no over-pick. The "prefer the fewest
> TaskManagers" ordering is unchanged (the stream still iterates TaskManagers in
> descending free-slot order). The \{{limit}} bound also makes the method safe 
> by
> construction: it can never read past the available slots.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to