Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5403#discussion_r170544096
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
---
@@ -175,105 +176,17 @@ MultiTaskSlot createRootSlot(
* preferred locations is checked.
*
* @param groupId which the returned slot must not contain
- * @param locationPreferences specifying which locations are preferred
+ * @param matcher TODO
* @return the resolved root slot and its locality wrt to the specified
location preferences
* or null if there was no root slot which did not contain
the given groupId
*/
@Nullable
- MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId,
Collection<TaskManagerLocation> locationPreferences) {
- Preconditions.checkNotNull(locationPreferences);
-
- final MultiTaskSlotLocality multiTaskSlotLocality;
-
- if (locationPreferences.isEmpty()) {
- multiTaskSlotLocality =
getResolvedRootSlotWithoutLocationPreferences(groupId);
- } else {
- multiTaskSlotLocality =
getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
- }
-
- return multiTaskSlotLocality;
- }
-
- /**
- * Gets a resolved root slot which does not yet contain the given
groupId. The method will try to
- * find a slot of a TaskManager contained in the collection of
preferred locations. If there is no such slot
- * with free capacities available, then the method will look for slots
of TaskManager which run on the same
- * machine as the TaskManager in the collection of preferred locations.
If there is no such slot, then any slot
- * with free capacities is returned. If there is no such slot, then
null is returned.
- *
- * @param groupId which the returned slot must not contain
- * @param locationPreferences specifying which locations are preferred
- * @return the resolved root slot and its locality wrt to the specified
location preferences
- * or null if there was not root slot which did not
contain the given groupId
- */
- @Nullable
- private MultiTaskSlotLocality
getResolvedRootSlotWithLocationPreferences(AbstractID groupId,
Collection<TaskManagerLocation> locationPreferences) {
- Preconditions.checkNotNull(groupId);
- Preconditions.checkNotNull(locationPreferences);
- final Set<String> hostnameSet = new HashSet<>(16);
- MultiTaskSlot nonLocalMultiTaskSlot = null;
-
- synchronized (lock) {
- for (TaskManagerLocation locationPreference :
locationPreferences) {
- final Set<MultiTaskSlot> multiTaskSlots =
resolvedRootSlots.get(locationPreference);
-
- if (multiTaskSlots != null) {
- for (MultiTaskSlot multiTaskSlot :
multiTaskSlots) {
- if
(!multiTaskSlot.contains(groupId)) {
- return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
- }
- }
-
-
hostnameSet.add(locationPreference.getHostname());
- }
- }
-
- for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>>
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
- if
(hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
- for (MultiTaskSlot multiTaskSlot :
taskManagerLocationSetEntry.getValue()) {
- if
(!multiTaskSlot.contains(groupId)) {
- return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
- }
- }
- } else if (nonLocalMultiTaskSlot == null) {
- for (MultiTaskSlot multiTaskSlot :
taskManagerLocationSetEntry.getValue()) {
- if
(!multiTaskSlot.contains(groupId)) {
- nonLocalMultiTaskSlot =
multiTaskSlot;
- }
- }
- }
- }
- }
-
- if (nonLocalMultiTaskSlot != null) {
- return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot,
Locality.NON_LOCAL);
- } else {
- return null;
- }
- }
-
- /**
- * Gets a resolved slot which does not yet contain the given groupId
without any location
- * preferences.
- *
- * @param groupId which the returned slot must not contain
- * @return the resolved slot or null if there was no root slot with
free capacities
- */
- @Nullable
- private MultiTaskSlotLocality
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
- Preconditions.checkNotNull(groupId);
-
- synchronized (lock) {
- for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
- for (MultiTaskSlot multiTaskSlot :
multiTaskSlots) {
- if (!multiTaskSlot.contains(groupId)) {
- return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
- }
- }
- }
- }
-
- return null;
+ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId,
SlotProfile.ProfileToSlotContextMatcher matcher) {
+ return matcher.findMatchWithLocality(
+
resolvedRootSlots.values().stream().flatMap(Collection::stream),
--- End diff --
ð
---