ericzeng created FLINK-40057:
--------------------------------

             Summary: DefaultSlotAssigner over-picks slots from the boundary 
TaskManager when minimal-TaskManager placement is enabled
                 Key: FLINK-40057
                 URL: https://issues.apache.org/jira/browse/FLINK-40057
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
            Reporter: ericzeng


When the cluster runs in application mode and
{\{JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}} is enabled,
{\{DefaultSlotAssigner#pickSlotsInMinimalTaskExecutors}} selects free slots from
TaskManagers in descending order of their free-slot count, so that as few
TaskManagers as possible are kept busy and the rest can be released.

The current implementation walks the sorted TaskManagers and adds *all* free
slots of each one until the requested slot count is reached:

{code:java}
final List<PhysicalSlot> pickedSlots = new ArrayList<>();
final Iterator<ResourceID> sortedTaskExecutors = 
getSortedTaskExecutors(slotsByTaskExecutor);
while (pickedSlots.size() < requestedGroups) {
Set<PhysicalSlot> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
pickedSlots.addAll(slotInfos);
}
return pickedSlots;
{code}

Because the last (boundary) TaskManager is added in full via \{{addAll}}, the
returned collection can contain *more* slots than \{{requestedGroups}} whenever
the boundary TaskManager has more free slots than are still needed. For example,
with \{{requestedGroups = 5}} and two TaskManagers offering 4 and 3 free slots,
the method returns 7 slots instead of 5.

This does not corrupt the final assignment — the downstream
{\{SimpleSlotMatchingResolver#matchSlotSharingGroupWithSlots}} only consumes the
first \{{requestedGroups}} slots through an iterator, so the surplus slots are
silently ignored. However, the returned "minimal" slot set no longer reflects
the minimal footprint it is supposed to represent: it pulls in extra slots from
the boundary TaskManager that were never required, which works against the very
goal of the method (minimizing the involved TaskManagers and the residual
fragmentation on the boundary TaskManager).

In addition, the \{{while}} loop has no \{{hasNext()}} guard. It relies entirely
on the caller's pre-check (\{{freeSlots.size() > 
requestExecutionSlotSharingGroups}})
to guarantee termination; the method itself would throw
{\{NoSuchElementException}} if it were ever called without enough free slots.

## Proposed change

Flatten the slots of the sorted TaskManagers into a single stream and keep only
the first \{{requestedGroups}} of them with \{{Stream#limit}}:

{code:java}
return getSortedTaskExecutors(slotsByTaskExecutor)
.flatMap(taskExecutor -> slotsByTaskExecutor.get(taskExecutor).stream())
.limit(requestedGroups)
.collect(Collectors.toCollection(ArrayList::new));
{code}

Because \{{limit}} is lazy and short-circuiting, TaskManagers beyond the 
boundary
one are never materialized, and the boundary TaskManager contributes exactly the
number of slots still needed — no over-pick. The "prefer the fewest
TaskManagers" ordering is unchanged (the stream still iterates TaskManagers in
descending free-slot order). The \{{limit}} bound also makes the method safe by
construction: it can never read past the available slots.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to