Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156887866 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * <p>The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * <p>Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map<SlotRequestId, TaskSlot> allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots; + + // Internal class to iterate over all resolved root slots + private ResolvedRootSlotValues resolvedMultiTaskSlotValues; + + public SlotSharingManager( + SlotSharingGroupId slotSharingGroupId, + AllocatedSlotActions allocatedSlotActions, + SlotOwner slotOwner) { + this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions); + this.slotOwner = Preconditions.checkNotNull(slotOwner); + + allTaskSlots = new HashMap<>(16); + unresolvedRootSlots = new HashMap<>(16); + resolvedRootSlots = new HashMap<>(16); + + resolvedMultiTaskSlotValues = null; + } + + public boolean isEmpty() { + return allTaskSlots.isEmpty(); + } + + public boolean contains(SlotRequestId slotRequestId) { + return allTaskSlots.containsKey(slotRequestId); + } + + @Nullable + public TaskSlot getTaskSlot(SlotRequestId slotRequestId) { + return allTaskSlots.get(slotRequestId); + } + + /** + * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext} future and + * the {@link SlotRequestId} of the allocated slot. + * + * @param slotRequestId of the root slot + * @param slotContextFuture with which we create the root slot + * @param allocatedSlotRequestId slot request id of the underlying allocated slot which can be used + * to cancel the pending slot request or release the allocated slot + * @return New root slot + */ + public MultiTaskSlot createRootSlot( + SlotRequestId slotRequestId, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot( + slotRequestId, + slotContextFuture, + allocatedSlotRequestId); + + allTaskSlots.put(slotRequestId, rootMultiTaskSlot); + unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); + + // add the root node to the set of resolved root nodes once the SlotContext future has been completed + // and we know the slot's TaskManagerLocation + slotContextFuture.whenComplete( + (SlotContext slotContext, Throwable throwable) -> { + if (slotContext != null) { + final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); + + if (resolvedRootNode != null) { + final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent( + slotContext.getTaskManagerLocation(), + taskManagerLocation -> new HashSet<>(4)); + + innerCollection.add(resolvedRootNode); + } + } else { + rootMultiTaskSlot.release(throwable); + } + }); + + return rootMultiTaskSlot; + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. First the given set of + * preferred locations is checked. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was no root slot which did not contain the given groupId + */ + @Nullable + public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { + Preconditions.checkNotNull(locationPreferences); + + if (locationPreferences.isEmpty()) { + return getResolvedRootSlotWithoutLocationPreferences(groupId); + } else { + return getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences); + } + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. The method will try to + * find a slot of a TaskManager contained in the collection of preferred locations. If there is no such slot + * with free capacities available, then the method will look for slots of TaskManager which run on the same + * machine as the TaskManager in the collection of preferred locations. If there is no such slot, then any slot + * with free capacities is returned. If there is no such slot, then null is returned. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was not root slot which did not contain the given groupId + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) { + Preconditions.checkNotNull(groupId); + Preconditions.checkNotNull(locationPreferences); + final Set<String> hostnameSet = new HashSet<>(); + + for (TaskManagerLocation locationPreference : locationPreferences) { + final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference); + + if (multiTaskSlots != null) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL); + } + } + + hostnameSet.add(locationPreference.getHostname()); + } + } + + MultiTaskSlot nonLocalMultiTaskSlot = null; + + for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) { + if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL); + } + } + } else if (nonLocalMultiTaskSlot == null) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + nonLocalMultiTaskSlot = multiTaskSlot; + } + } + } + } + + if (nonLocalMultiTaskSlot != null) { + return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL); + } else { + return null; + } + } + + /** + * Gets a resolved slot which does not yet contain the given groupId without any location + * preferences. + * + * @param groupId which the returned slot must not contain + * @return the resolved slot or null if there was no root slot with free capacities + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) { + Preconditions.checkNotNull(groupId); + + for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED); + } + } + } + + return null; + } + + /** + * Gets an unresolved slot which does not yet contain the given groupId. An unresolved + * slot is a slot whose underlying allocated slot has not been allocated yet. + * + * @param groupId which the returned slot must not contain + * @return the unresolved slot or null if there was no root slot with free capacities + */ + @Nullable + public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) { + for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) { + if (!multiTaskSlot.contains(groupId)) { + return multiTaskSlot; + } + } + + return null; + } + + // ------------------------------------------------------------------------ + // Inner classes: TaskSlot hierarchy and helper classes + // ------------------------------------------------------------------------ + + /** + * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}. + */ + public static final class MultiTaskSlotLocality { + private final MultiTaskSlot multiTaskSlot; + + private final Locality locality; + + public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) { + this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot); + this.locality = Preconditions.checkNotNull(locality); + } + + public MultiTaskSlot getMultiTaskSlot() { + return multiTaskSlot; + } + + public Locality getLocality() { + return locality; + } + + public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality) { + return new MultiTaskSlotLocality(multiTaskSlot, locality); + } + } + + /** + * Base class for all task slots. + */ + public abstract static class TaskSlot { + // every TaskSlot has an associated slot request id + private final SlotRequestId slotRequestId; + + // all task slots except for the root slots have a group id assigned + @Nullable + private final AbstractID groupId; + + protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) { + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + this.groupId = groupId; + } + + public SlotRequestId getSlotRequestId() { + return slotRequestId; + } + + @Nullable + public AbstractID getGroupId() { + return groupId; + } + + /** + * Check whether the task slot contains the given groupId. + * + * @param groupId which to check whether it is contained + * @return true if the task slot contains the given groupId, otherwise false + */ + public boolean contains(AbstractID groupId) { + return Objects.equals(this.groupId, groupId); + } + + /** + * Release the task slot. + * + * @param cause for the release + * @return true if the slot could be released, otherwise false + */ + public abstract boolean release(Throwable cause); + } + + /** + * {@link TaskSlot} implementation which can have multiple other task slots assigned as children. + */ + public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload { + + private final Map<AbstractID, TaskSlot> children; + + // the root node has its parent set to null + @Nullable + private final MultiTaskSlot parent; + + // underlying allocated slot + private final CompletableFuture<SlotContext> slotContextFuture; + + // slot request id of the allocated slot + @Nullable + private final SlotRequestId allocatedSlotRequestId; + + // true if we are currently releasing our children + private boolean releasingChildren; + + private MultiTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + MultiTaskSlot parent) { + this( + slotRequestId, + groupId, + Preconditions.checkNotNull(parent), + parent.getSlotContextFuture(), + null); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + this( + slotRequestId, + null, + null, + slotContextFuture, + allocatedSlotRequestId); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + @Nullable AbstractID groupId, + MultiTaskSlot parent, + CompletableFuture<SlotContext> slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + super(slotRequestId, groupId); + + this.parent = parent; + this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); + this.allocatedSlotRequestId = allocatedSlotRequestId; + + this.children = new HashMap<>(16); + this.releasingChildren = false; + + slotContextFuture.whenComplete( + (SlotContext ignored, Throwable throwable) -> { + if (throwable != null) { + release(throwable); + } + }); + } + + public CompletableFuture<SlotContext> getSlotContextFuture() { + return slotContextFuture; + } + + /** + * Allocates a {@link MultiTaskSlot} and registers it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new multi task slot + * @param groupId under which the new multi task slot is registered + * @return the newly allocated {@link MultiTaskSlot} + */ + MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { + Preconditions.checkState(!super.contains(groupId)); + + final MultiTaskSlot inner = new MultiTaskSlot( + slotRequestId, + groupId, + this); + + children.put(groupId, inner); + + // register the newly allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, inner); + + return inner; + } + + /** + * Allocates a {@link SingleTaskSlot} and registeres it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new single task slot + * @param groupId under which the new single task slot is registered + * @param locality of the allocation + * @return the newly allocated {@link SingleTaskSlot} + */ + SingleTaskSlot allocateSingleTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + Locality locality) { + Preconditions.checkState(!super.contains(groupId)); + + final SingleTaskSlot leave = new SingleTaskSlot( + slotRequestId, + groupId, + this, + locality); + + children.put(groupId, leave); + + // register the newly allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, leave); + + return leave; + } + + /** + * Checks whether this slot or any of its children contains the given groupId. + * + * @param groupId which to check whether it is contained + * @return true if this or any of its children contains the given groupId, otherwise false + */ + @Override + public boolean contains(AbstractID groupId) { + if (super.contains(groupId)) { + return true; + } else { + for (TaskSlot taskSlot : children.values()) { + if (taskSlot.contains(groupId)) { + return true; + } + } + + return false; + } + } + + @Override + public boolean release(Throwable cause) { + releasingChildren = true; + + // first release all children and remove them if they could be released immediately + children.values().removeIf(node -> { + boolean release = node.release(cause); + + if (release) { + allTaskSlots.remove(node.slotRequestId); + } + + return release; + }); + + releasingChildren = false; + + if (children.isEmpty()) { + if (parent != null) { + // we remove ourselves from our parent if we no longer have children + parent.releaseChild(getGroupId()); + } else { + // we are the root node --> remove the root node from the list of task slots + allTaskSlots.remove(getSlotRequestId()); + + if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) { + // the root node should still be unresolved + unresolvedRootSlots.remove(getSlotRequestId()); + } else { + // the root node should be resolved --> we can access the slot context + final SlotContext slotContext = slotContextFuture.getNow(null); + + if (slotContext != null) { + final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation()); + + if (multiTaskSlots != null) { + multiTaskSlots.remove(this); + + if (multiTaskSlots.isEmpty()) { + resolvedRootSlots.remove(slotContext.getTaskManagerLocation()); + } + } + } + } + + // release the underlying allocated slot + allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause); + } + + return true; + } else { + return false; + } + } + + /** + * Releases the child with the given childGroupId. + * + * @param childGroupId identifying the child to release + */ + private void releaseChild(AbstractID childGroupId) { + if (!releasingChildren) { + TaskSlot child = children.remove(childGroupId); + + if (child != null) { + allTaskSlots.remove(child.getSlotRequestId()); + } + + if (children.isEmpty()) { + release(new FlinkException("Release multi task slot because all children have been released.")); + } + } + } + } + + /** + * {@link TaskSlot} implementation which harbours a {@link LogicalSlot}. The {@link SingleTaskSlot} + * cannot have any children assigned. + */ + public final class SingleTaskSlot extends TaskSlot { + private final MultiTaskSlot parent; + + // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed + private final CompletableFuture<LogicalSlot> logicalSlotFuture; + + private SingleTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + MultiTaskSlot parent, + Locality locality) { + super(slotRequestId, groupId); + + this.parent = Preconditions.checkNotNull(parent); + + Preconditions.checkNotNull(locality); + logicalSlotFuture = parent.getSlotContextFuture() + .thenApply( + (SlotContext slotContext) -> + new SingleLogicalSlot( + slotRequestId, + slotContext, + slotSharingGroupId, + locality, + slotOwner)); + } + + public CompletableFuture<LogicalSlot> getLogicalSlotFuture() { + return logicalSlotFuture; + } + + @Override + public boolean release(Throwable cause) { + logicalSlotFuture.completeExceptionally(cause); + + boolean pendingLogicalSlotRelease = false; + + if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally()) { + // we have a single task slot which we first have to release + final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null); + + if (logicalSlot != null && logicalSlot.isAlive()) { + pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone(); + } + } + + if (!pendingLogicalSlotRelease) { + parent.releaseChild(getGroupId()); + } + + return !pendingLogicalSlotRelease; + } + } + + // ------------------------------------------------------------------------ + // Methods and classes for testing + // ------------------------------------------------------------------------ + + /** + * Returns a collection of all resolved root slots. + * + * @return Collection of all resolved root slots + */ + @VisibleForTesting + public Collection<MultiTaskSlot> getResolvedRootSlots() { + ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues; + + if (vs == null ){ + vs = new ResolvedRootSlotValues(); + resolvedMultiTaskSlotValues = vs; + } + + return vs; + } + + @VisibleForTesting + Collection<MultiTaskSlot> getUnresolvedRootSlots() { + return unresolvedRootSlots.values(); + } + + /** + * Collection of all resolved {@link MultiTaskSlot} root slots. + */ + private final class ResolvedRootSlotValues extends AbstractCollection<MultiTaskSlot> { + + @Override + public Iterator<MultiTaskSlot> iterator() { + return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator()); + } + + @Override + public int size() { + int numberResolvedMultiTaskSlots = 0; + + for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) { + numberResolvedMultiTaskSlots += multiTaskSlots.size(); + } + + return numberResolvedMultiTaskSlots; + } + } + + /** + * Iterator over all resolved {@link MultiTaskSlot} root slots. + */ + private static final class ResolvedRootSlotIterator implements Iterator<MultiTaskSlot> { + private final Iterator<Set<MultiTaskSlot>> baseIterator; + private Iterator<MultiTaskSlot> currentIterator; + + private ResolvedRootSlotIterator(Iterator<Set<MultiTaskSlot>> baseIterator) { + this.baseIterator = Preconditions.checkNotNull(baseIterator); + + if (baseIterator.hasNext()) { + currentIterator = baseIterator.next().iterator(); + } else { + currentIterator = Collections.emptyIterator(); + } + } + + @Override + public boolean hasNext() { + progressToNextElement(); + + return currentIterator.hasNext(); + } + + @Override + public MultiTaskSlot next() { + progressToNextElement(); + + return currentIterator.next(); + } + + private void progressToNextElement() { + while(baseIterator.hasNext() && ! currentIterator.hasNext()) { --- End diff -- Good catch. Will correct it.
---