Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5403#discussion_r170261587
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
@@ -1343,63 +1330,27 @@ boolean contains(AllocationID slotId) {
* Poll a slot which matches the required resource profile. The
polling tries to satisfy the
* location preferences, by TaskManager and by host.
*
- * @param resourceProfile The required resource profile.
- * @param locationPreferences The location preferences, in
order to be checked.
+ * @param slotProfile slot profile that specifies the
requirements for the slot
*
* @return Slot which matches the resource profile, null if we
can't find a match
*/
- SlotAndLocality poll(ResourceProfile resourceProfile,
Collection<TaskManagerLocation> locationPreferences) {
+ SlotAndLocality poll(SlotProfile slotProfile) {
// fast path if no slots are available
if (availableSlots.isEmpty()) {
return null;
}
- boolean hadLocationPreference = false;
+ SlotProfile.ProfileToSlotContextMatcher matcher =
slotProfile.matcher();
- if (locationPreferences != null &&
!locationPreferences.isEmpty()) {
-
- // first search by TaskManager
- for (TaskManagerLocation location :
locationPreferences) {
- hadLocationPreference = true;
-
- final Set<AllocatedSlot> onTaskManager
= availableSlotsByTaskManager.get(location.getResourceID());
- if (onTaskManager != null) {
- for (AllocatedSlot candidate :
onTaskManager) {
- if
(candidate.getResourceProfile().isMatching(resourceProfile)) {
-
remove(candidate.getAllocationId());
- return new
SlotAndLocality(candidate, Locality.LOCAL);
- }
- }
- }
- }
-
- // now, search by host
- for (TaskManagerLocation location :
locationPreferences) {
- final Set<AllocatedSlot> onHost =
availableSlotsByHost.get(location.getFQDNHostname());
- if (onHost != null) {
- for (AllocatedSlot candidate :
onHost) {
- if
(candidate.getResourceProfile().isMatching(resourceProfile)) {
-
remove(candidate.getAllocationId());
- return new
SlotAndLocality(candidate, Locality.HOST_LOCAL);
- }
- }
- }
- }
- }
-
- // take any slot
- for (SlotAndTimestamp candidate :
availableSlots.values()) {
- final AllocatedSlot slot = candidate.slot();
-
- if
(slot.getResourceProfile().isMatching(resourceProfile)) {
+ return matcher.findMatchWithLocality(
+ availableSlots.values().stream(),
+ SlotAndTimestamp::slot,
+ (slot) ->
slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
+ ((slotAndTimestamp, locality) -> {
+ AllocatedSlot slot =
slotAndTimestamp.slot();
remove(slot.getAllocationId());
--- End diff --
Let's not call remove from within the producer. I think it should not have
side effects. Instead we can remove the slot after we have found a slot.
---