Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156902096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * <p>The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * <p>Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map<SlotRequestId, TaskSlot> allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots; + + // Internal class to iterate over all resolved root slots + private ResolvedRootSlotValues resolvedMultiTaskSlotValues; + + public SlotSharingManager( + SlotSharingGroupId slotSharingGroupId, + AllocatedSlotActions allocatedSlotActions, + SlotOwner slotOwner) { + this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions); + this.slotOwner = Preconditions.checkNotNull(slotOwner); + + allTaskSlots = new HashMap<>(16); + unresolvedRootSlots = new HashMap<>(16); + resolvedRootSlots = new HashMap<>(16); + + resolvedMultiTaskSlotValues = null; + } + + public boolean isEmpty() { + return allTaskSlots.isEmpty(); + } + + public boolean contains(SlotRequestId slotRequestId) { + return allTaskSlots.containsKey(slotRequestId); + } + + @Nullable + public TaskSlot getTaskSlot(SlotRequestId slotRequestId) { + return allTaskSlots.get(slotRequestId); + } + + /** + * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext} future and + * the {@link SlotRequestId} of the allocated slot. + * + * @param slotRequestId of the root slot + * @param slotContextFuture with which we create the root slot + * @param allocatedSlotRequestId slot request id of the underlying allocated slot which can be used + * to cancel the pending slot request or release the allocated slot + * @return New root slot + */ + public MultiTaskSlot createRootSlot( + SlotRequestId slotRequestId, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot( + slotRequestId, + slotContextFuture, + allocatedSlotRequestId); + + allTaskSlots.put(slotRequestId, rootMultiTaskSlot); + unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); + + // add the root node to the set of resolved root nodes once the SlotContext future has been completed + // and we know the slot's TaskManagerLocation + slotContextFuture.whenComplete( + (SlotContext slotContext, Throwable throwable) -> { + if (slotContext != null) { + final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); + + if (resolvedRootNode != null) { + final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent( + slotContext.getTaskManagerLocation(), + taskManagerLocation -> new HashSet<>(4)); + + innerCollection.add(resolvedRootNode); + } + } else { + rootMultiTaskSlot.release(throwable); + } + }); + + return rootMultiTaskSlot; + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. First the given set of + * preferred locations is checked. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was no root slot which did not contain the given groupId + */ + @Nullable + public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { + Preconditions.checkNotNull(locationPreferences); + + if (locationPreferences.isEmpty()) { + return getResolvedRootSlotWithoutLocationPreferences(groupId); + } else { + return getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences); + } + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. The method will try to + * find a slot of a TaskManager contained in the collection of preferred locations. If there is no such slot + * with free capacities available, then the method will look for slots of TaskManager which run on the same + * machine as the TaskManager in the collection of preferred locations. If there is no such slot, then any slot + * with free capacities is returned. If there is no such slot, then null is returned. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was not root slot which did not contain the given groupId + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { + Preconditions.checkNotNull(groupId); + Preconditions.checkNotNull(locationPreferences); + final Set<String> hostnameSet = new HashSet<>(); + + for (TaskManagerLocation locationPreference : locationPreferences) { + final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference); + + if (multiTaskSlots != null) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL); + } + } + + hostnameSet.add(locationPreference.getHostname()); + } + } + + MultiTaskSlot nonLocalMultiTaskSlot = null; + + for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) { + if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL); + } + } + } else if (nonLocalMultiTaskSlot == null) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + nonLocalMultiTaskSlot = multiTaskSlot; + } + } + } + } + + if (nonLocalMultiTaskSlot != null) { + return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL); + } else { + return null; + } + } + + /** + * Gets a resolved slot which does not yet contain the given groupId without any location + * preferences. + * + * @param groupId which the returned slot must not contain + * @return the resolved slot or null if there was no root slot with free capacities + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) { + Preconditions.checkNotNull(groupId); + + for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED); + } + } + } + + return null; + } + + /** + * Gets an unresolved slot which does not yet contain the given groupId. An unresolved + * slot is a slot whose underlying allocated slot has not been allocated yet. + * + * @param groupId which the returned slot must not contain + * @return the unresolved slot or null if there was no root slot with free capacities + */ + @Nullable + public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) { + for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) { + if (!multiTaskSlot.contains(groupId)) { + return multiTaskSlot; + } + } + + return null; + } + + // ------------------------------------------------------------------------ + // Inner classes: TaskSlot hierarchy and helper classes + // ------------------------------------------------------------------------ + + /** + * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}. + */ + public static final class MultiTaskSlotLocality { + private final MultiTaskSlot multiTaskSlot; + + private final Locality locality; + + public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) { + this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot); + this.locality = Preconditions.checkNotNull(locality); + } + + public MultiTaskSlot getMultiTaskSlot() { + return multiTaskSlot; + } + + public Locality getLocality() { + return locality; + } + + public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality) { + return new MultiTaskSlotLocality(multiTaskSlot, locality); + } + } + + /** + * Base class for all task slots. + */ + public abstract static class TaskSlot { + // every TaskSlot has an associated slot request id + private final SlotRequestId slotRequestId; + + // all task slots except for the root slots have a group id assigned + @Nullable + private final AbstractID groupId; + + protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) { + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + this.groupId = groupId; + } + + public SlotRequestId getSlotRequestId() { + return slotRequestId; + } + + @Nullable + public AbstractID getGroupId() { + return groupId; + } + + /** + * Check whether the task slot contains the given groupId. + * + * @param groupId which to check whether it is contained + * @return true if the task slot contains the given groupId, otherwise false + */ + public boolean contains(AbstractID groupId) { + return Objects.equals(this.groupId, groupId); + } + + /** + * Release the task slot. + * + * @param cause for the release + * @return true if the slot could be released, otherwise false + */ + public abstract boolean release(Throwable cause); + } + + /** + * {@link TaskSlot} implementation which can have multiple other task slots assigned as children. + */ + public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload { + + private final Map<AbstractID, TaskSlot> children; + + // the root node has its parent set to null + @Nullable + private final MultiTaskSlot parent; + + // underlying allocated slot + private final CompletableFuture<SlotContext> slotContextFuture; + + // slot request id of the allocated slot + @Nullable + private final SlotRequestId allocatedSlotRequestId; + + // true if we are currently releasing our children + private boolean releasingChildren; + + private MultiTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + MultiTaskSlot parent) { + this( + slotRequestId, + groupId, + Preconditions.checkNotNull(parent), + parent.getSlotContextFuture(), + null); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + this( + slotRequestId, + null, + null, + slotContextFuture, + allocatedSlotRequestId); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + @Nullable AbstractID groupId, + MultiTaskSlot parent, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + super(slotRequestId, groupId); + + this.parent = parent; + this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); + this.allocatedSlotRequestId = allocatedSlotRequestId; + + this.children = new HashMap<>(16); + this.releasingChildren = false; + + slotContextFuture.whenComplete( + (SlotContext ignored, Throwable throwable) -> { + if (throwable != null) { + release(throwable); + } + }); + } + + public CompletableFuture<SlotContext> getSlotContextFuture() { + return slotContextFuture; + } + + /** + * Allocates a {@link MultiTaskSlot} and registers it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new multi task slot + * @param groupId under which the new multi task slot is registered + * @return the newly allocated {@link MultiTaskSlot} + */ + MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { + Preconditions.checkState(!super.contains(groupId)); + + final MultiTaskSlot inner = new MultiTaskSlot( + slotRequestId, + groupId, + this); + + children.put(groupId, inner); + + // register the newly allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, inner); + + return inner; + } + + /** + * Allocates a {@link SingleTaskSlot} and registeres it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new single task slot + * @param groupId under which the new single task slot is registered + * @param locality of the allocation + * @return the newly allocated {@link SingleTaskSlot} + */ + SingleTaskSlot allocateSingleTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + Locality locality) { + Preconditions.checkState(!super.contains(groupId)); + + final SingleTaskSlot leave = new SingleTaskSlot( --- End diff -- will change.
---