This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f75d8e1fbb16ba08ab5a625f50e988c708a8a2bf Author: tison <wander4...@gmail.com> AuthorDate: Fri Jul 26 23:44:43 2019 +0800 [FLINK-13334][coordination] Remove legacy slot implementation This closes #9245. --- .../runtime/executiongraph/ExecutionJobVertex.java | 4 - .../apache/flink/runtime/instance/SharedSlot.java | 333 ----------- .../apache/flink/runtime/instance/SimpleSlot.java | 290 --------- .../org/apache/flink/runtime/instance/Slot.java | 381 ------------ .../instance/SlotSharingGroupAssignment.java | 664 --------------------- .../jobmanager/scheduler/CoLocationConstraint.java | 91 +-- .../jobmanager/scheduler/CoLocationGroup.java | 6 - .../jobmanager/scheduler/SlotSharingGroup.java | 23 - .../ExecutionGraphDeploymentTest.java | 24 +- .../ExecutionGraphSchedulingTest.java | 56 +- .../ExecutionPartitionLifecycleTest.java | 13 +- .../runtime/executiongraph/ExecutionTest.java | 34 +- .../ExecutionVertexLocalityTest.java | 18 +- .../flink/runtime/instance/SharedSlotsTest.java | 630 ------------------- .../flink/runtime/instance/SimpleSlotTest.java | 135 ----- .../scheduler/ScheduleWithCoLocationHintTest.java | 10 - 16 files changed, 73 insertions(+), 2639 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index e883792..1a0d389 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -515,10 +515,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable synchronized (stateMonitor) { // check and reset the sharing groups with scheduler hints - if (slotSharingGroup != null) { - slotSharingGroup.clearTaskAssignment(); - } - for (int i = 0; i < parallelism; i++) { taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java deleted file mode 100644 index 0f9c104..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; - -import javax.annotation.Nullable; - -import java.util.ConcurrentModificationException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * This class represents a shared slot. A shared slot can have multiple - * {@link SimpleSlot} instances within itself. This allows to - * schedule multiple tasks simultaneously to the same resource. Sharing a resource with multiple - * tasks is crucial for simple pipelined / streamed execution, where both the sender and the receiver - * are typically active at the same time. - * - * <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the caller has to guarantee proper - * synchronization. In the current implementation, all concurrently modifying operations are - * passed through a {@link SlotSharingGroupAssignment} object which is responsible for - * synchronization. - */ -public class SharedSlot extends Slot implements LogicalSlot { - - /** The assignment group os shared slots that manages the availability and release of the slots */ - private final SlotSharingGroupAssignment assignmentGroup; - - /** The set os sub-slots allocated from this shared slot */ - private final Set<Slot> subSlots; - - // ------------------------------------------------------------------------ - // Old Constructors (legacy code) - // ------------------------------------------------------------------------ - - /** - * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. - * This constructor is used to create a slot directly from an instance. - * - * @param owner The component from which this slot is allocated. - * @param location The location info of the TaskManager where the slot was allocated from - * @param slotNumber The number of the slot. - * @param taskManagerGateway The gateway to communicate with the TaskManager - * @param assignmentGroup The assignment group that this shared slot belongs to. - */ - public SharedSlot( - SlotOwner owner, TaskManagerLocation location, int slotNumber, - TaskManagerGateway taskManagerGateway, - SlotSharingGroupAssignment assignmentGroup) { - - this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null); - } - - /** - * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs - * to the given task group. - * - * @param owner The component from which this slot is allocated. - * @param location The location info of the TaskManager where the slot was allocated from - * @param slotNumber The number of the slot. - * @param taskManagerGateway The gateway to communicate with the TaskManager - * @param assignmentGroup The assignment group that this shared slot belongs to. - * @param parent The parent slot of this slot. - * @param groupId The assignment group of this slot. - */ - public SharedSlot( - SlotOwner owner, - TaskManagerLocation location, - int slotNumber, - TaskManagerGateway taskManagerGateway, - SlotSharingGroupAssignment assignmentGroup, - @Nullable SharedSlot parent, - @Nullable AbstractID groupId) { - - super(owner, location, slotNumber, taskManagerGateway, parent, groupId); - - this.assignmentGroup = checkNotNull(assignmentGroup); - this.subSlots = new HashSet<Slot>(); - } - - // ------------------------------------------------------------------------ - // Constructors - // ------------------------------------------------------------------------ - - /** - * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. - * This constructor is used to create a slot directly from an instance. - * - * @param slotContext The slot context of this shared slot - * @param owner The component from which this slot is allocated. - * @param assignmentGroup The assignment group that this shared slot belongs to. - */ - public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) { - this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null); - } - - private SharedSlot( - SlotContext slotInformation, - SlotOwner owner, - int slotNumber, - SlotSharingGroupAssignment assignmentGroup, - @Nullable SharedSlot parent, - @Nullable AbstractID groupId) { - - super(slotInformation, owner, slotNumber, parent, groupId); - - this.assignmentGroup = checkNotNull(assignmentGroup); - this.subSlots = new HashSet<Slot>(); - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - @Override - public int getNumberLeaves() { - while (true) { - try { - int result = 0; - for (Slot slot: subSlots){ - result += slot.getNumberLeaves(); - } - return result; - } - catch (ConcurrentModificationException e) { - // ignore and retry - } - } - } - - /** - * Checks whether this slot is a root slot that has not yet added any child slots. - * - * @return True, if this slot is a root slot and has not yet added any children, false otherwise. - */ - public boolean isRootAndEmpty() { - return getParent() == null && subSlots.isEmpty(); - } - - /** - * Checks whether this shared slot has any sub slots. - * - * @return True, if the shared slot has sub slots, false otherwise. - */ - public boolean hasChildren() { - return subSlots.size() > 0; - } - - @Override - public Locality getLocality() { - return Locality.UNKNOWN; - } - - @Override - public boolean tryAssignPayload(Payload payload) { - throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot."); - } - - @Nullable - @Override - public Payload getPayload() { - return null; - } - - @Override - public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) { - assignmentGroup.releaseSharedSlot(this); - - if (!(isReleased() && subSlots.isEmpty())) { - throw new IllegalStateException("Bug: SharedSlot is not released or not empty after call to releaseSlot()"); - } - - return CompletableFuture.completedFuture(null); - } - - @Override - public int getPhysicalSlotNumber() { - return getRootSlotNumber(); - } - - @Override - public AllocationID getAllocationId() { - return getSlotContext().getAllocationId(); - } - - @Override - public SlotRequestId getSlotRequestId() { - return NO_SLOT_REQUEST_ID; - } - - @Nullable - @Override - public SlotSharingGroupId getSlotSharingGroupId() { - return NO_SLOT_SHARING_GROUP_ID; - } - - /** - * Gets the set of all slots allocated as sub-slots of this shared slot. - * - * @return All sub-slots allocated from this shared slot. - */ - Set<Slot> getSubSlots() { - return subSlots; - } - - // ------------------------------------------------------------------------ - // INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - Allocating sub-slots - // ------------------------------------------------------------------------ - - /** - * Creates a new sub slot if the slot is not dead, yet. This method should only be called from - * the assignment group instance to guarantee synchronization. - * - * <b>NOTE:</b> This method is not synchronized and must only be called from - * the slot's assignment group. - * - * @param groupId The ID to identify tasks which can be deployed in this sub slot. - * @return The new sub slot if the shared slot is still alive, otherwise null. - */ - SimpleSlot allocateSubSlot(AbstractID groupId) { - if (isAlive()) { - SimpleSlot slot = new SimpleSlot( - getOwner(), - getTaskManagerLocation(), - subSlots.size(), - getTaskManagerGateway(), - this, - groupId); - subSlots.add(slot); - return slot; - } - else { - return null; - } - } - - /** - * Creates a new sub slot if the slot is not dead, yet. This method should only be called from - * the assignment group instance to guarantee synchronization. - * - * NOTE: This method should only be called from the slot's assignment group. - * - * @param groupId The ID to identify tasks which can be deployed in this sub slot. - * @return The new sub slot if the shared slot is still alive, otherwise null. - */ - SharedSlot allocateSharedSlot(AbstractID groupId){ - if (isAlive()) { - SharedSlot slot = new SharedSlot( - getOwner(), - getTaskManagerLocation(), - subSlots.size(), - getTaskManagerGateway(), - assignmentGroup, - this, - groupId); - subSlots.add(slot); - return slot; - } - else { - return null; - } - } - - // ------------------------------------------------------------------------ - // INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - releasing slots - // ------------------------------------------------------------------------ - - /** - * Disposes the given sub slot. This method is called by the child simple slot to tell this - * shared slot to release it. - * - * The releasing process itself is done by the {@link SlotSharingGroupAssignment}, which controls - * all the modifications in this shared slot. - * - * NOTE: This method must not modify the shared slot directly !!! - * - * @param slot The sub-slot which shall be removed from the shared slot. - */ - void releaseChild(SimpleSlot slot) { - assignmentGroup.releaseSimpleSlot(slot); - } - - /** - * Removes the given slot from this shared slot. This method Should only be called - * through this shared slot's {@link SlotSharingGroupAssignment} - * - * @param slot slot to be removed from the set of sub slots. - * @return Number of remaining sub slots - */ - int removeDisposedChildSlot(Slot slot) { - if (!slot.isReleased() || !subSlots.remove(slot)) { - throw new IllegalArgumentException(); - } - return subSlots.size(); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "Shared " + super.toString(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java deleted file mode 100644 index 3363b15..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -/** - * A SimpleSlot represents a single slot on a TaskManager instance, or a slot within a shared slot. - * - * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot. - * If not, then the parent attribute is null. - */ -public class SimpleSlot extends Slot implements LogicalSlot { - - /** The updater used to atomically swap in the payload */ - private static final AtomicReferenceFieldUpdater<SimpleSlot, Payload> PAYLOAD_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Payload.class, "payload"); - - // ------------------------------------------------------------------------ - - private final CompletableFuture<?> releaseFuture = new CompletableFuture<>(); - - /** Id of the task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */ - private volatile Payload payload; - - /** The locality attached to the slot, defining whether the slot was allocated at the desired location. */ - private volatile Locality locality = Locality.UNCONSTRAINED; - - // ------------------------------------------------------------------------ - // Old Constructors (legacy mode) - // ------------------------------------------------------------------------ - - /** - * Creates a new simple slot that stands alone and does not belong to shared slot. - * - * @param owner The component from which this slot is allocated. - * @param location The location info of the TaskManager where the slot was allocated from - * @param slotNumber The number of the task slot on the instance. - * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot - */ - public SimpleSlot( - SlotOwner owner, TaskManagerLocation location, int slotNumber, - TaskManagerGateway taskManagerGateway) { - this(owner, location, slotNumber, taskManagerGateway, null, null); - } - - /** - * Creates a new simple slot that belongs to the given shared slot and - * is identified by the given ID. - * - * @param owner The component from which this slot is allocated. - * @param location The location info of the TaskManager where the slot was allocated from - * @param slotNumber The number of the simple slot in its parent shared slot. - * @param taskManagerGateway to communicate with the associated task manager. - * @param parent The parent shared slot. - * @param groupID The ID that identifies the group that the slot belongs to. - */ - public SimpleSlot( - SlotOwner owner, - TaskManagerLocation location, - int slotNumber, - TaskManagerGateway taskManagerGateway, - @Nullable SharedSlot parent, - @Nullable AbstractID groupID) { - - super( - parent != null ? - parent.getSlotContext() : - new SimpleSlotContext( - NO_ALLOCATION_ID, - location, - slotNumber, - taskManagerGateway), - owner, - slotNumber, - parent, - groupID); - } - - // ------------------------------------------------------------------------ - // Constructors - // ------------------------------------------------------------------------ - - /** - * Creates a new simple slot that stands alone and does not belong to shared slot. - * - * @param slotContext The slot context of this simple slot - * @param owner The component from which this slot is allocated. - */ - public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) { - this(slotContext, owner, slotNumber, null, null); - } - - /** - * Creates a new simple slot that belongs to the given shared slot and - * is identified by the given ID.. - * - * @param parent The parent shared slot. - * @param owner The component from which this slot is allocated. - * @param groupID The ID that identifies the group that the slot belongs to. - */ - public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) { - this(parent.getSlotContext(), owner, slotNumber, parent, groupID); - } - - /** - * Creates a new simple slot that belongs to the given shared slot and - * is identified by the given ID.. - * - * @param slotContext The slot context of this simple slot - * @param owner The component from which this slot is allocated. - * @param slotNumber The number of the simple slot in its parent shared slot. - * @param parent The parent shared slot. - * @param groupID The ID that identifies the group that the slot belongs to. - */ - private SimpleSlot( - SlotContext slotContext, - SlotOwner owner, - int slotNumber, - @Nullable SharedSlot parent, - @Nullable AbstractID groupID) { - super(slotContext, owner, slotNumber, parent, groupID); - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - @Override - public int getNumberLeaves() { - return 1; - } - - /** - * Atomically sets the executed vertex, if no vertex has been assigned to this slot so far. - * - * @param payload The vertex to assign to this slot. - * @return True, if the vertex was assigned, false, otherwise. - */ - @Override - public boolean tryAssignPayload(Payload payload) { - Preconditions.checkNotNull(payload); - - // check that we can actually run in this slot - if (isCanceled()) { - return false; - } - - // atomically assign the vertex - if (!PAYLOAD_UPDATER.compareAndSet(this, null, payload)) { - return false; - } - - // we need to do a double check that we were not cancelled in the meantime - if (isCanceled()) { - this.payload = null; - return false; - } - - return true; - } - - @Nullable - @Override - public Payload getPayload() { - return payload; - } - - /** - * Gets the locality information attached to this slot. - * @return The locality attached to the slot. - */ - public Locality getLocality() { - return locality; - } - - /** - * Attached locality information to this slot. - * @param locality The locality attached to the slot. - */ - public void setLocality(Locality locality) { - this.locality = locality; - } - - // ------------------------------------------------------------------------ - // Cancelling & Releasing - // ------------------------------------------------------------------------ - - @Override - public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) { - if (!isCanceled()) { - final CompletableFuture<?> terminationFuture; - - if (payload != null) { - // trigger the failure of the slot payload - payload.fail(cause != null ? cause : new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation())); - - // wait for the termination of the payload before releasing the slot - terminationFuture = payload.getTerminalStateFuture(); - } else { - terminationFuture = CompletableFuture.completedFuture(null); - } - - terminationFuture.whenComplete( - (Object ignored, Throwable throwable) -> { - // release directly (if we are directly allocated), - // otherwise release through the parent shared slot - if (getParent() == null) { - // we have to give back the slot to the owning instance - if (markCancelled()) { - try { - getOwner().returnLogicalSlot(this); - releaseFuture.complete(null); - } catch (Exception e) { - releaseFuture.completeExceptionally(e); - } - } - } else { - // we have to ask our parent to dispose us - getParent().releaseChild(this); - - releaseFuture.complete(null); - } - }); - } - - return releaseFuture; - } - - @Override - public int getPhysicalSlotNumber() { - return getRootSlotNumber(); - } - - @Override - public AllocationID getAllocationId() { - return getSlotContext().getAllocationId(); - } - - @Override - public SlotRequestId getSlotRequestId() { - return NO_SLOT_REQUEST_ID; - } - - @Nullable - @Override - public SlotSharingGroupId getSlotSharingGroupId() { - return NO_SLOT_SHARING_GROUP_ID; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "SimpleSlot " + super.toString(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java deleted file mode 100644 index 9c1b627..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Base class for slots that the Scheduler / ExecutionGraph take from the SlotPool and use to place - * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a TaskManager's resources), - * plus additional fields to track what is currently executed in that slot, or if the slot is still - * used or disposed (ExecutionGraph gave it back to the pool). - * - * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). In the more complex - * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. Shared slots may contain - * other shared slots which in turn can hold simple slots. That way, a shared slot may define a tree - * of slots that belong to it. - */ -public abstract class Slot { - - /** Updater for atomic state transitions */ - private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status"); - - /** State where slot is fresh and alive. Tasks may be added to the slot. */ - private static final int ALLOCATED_AND_ALIVE = 0; - - /** State where the slot has been canceled and is in the process of being released */ - private static final int CANCELLED = 1; - - /** State where all tasks in this slot have been canceled and the slot been given back to the instance */ - private static final int RELEASED = 2; - - // temporary placeholder for Slots that are not constructed from an AllocatedSlot (by legacy code) - protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L); - protected static final SlotRequestId NO_SLOT_REQUEST_ID = new SlotRequestId(0L, 0L); - protected static final SlotSharingGroupId NO_SLOT_SHARING_GROUP_ID = new SlotSharingGroupId(0L, 0L); - - // ------------------------------------------------------------------------ - - /** Context of this logical slot. */ - private final SlotContext slotContext; - - /** The owner of this slot - the slot was taken from that owner and must be disposed to it */ - private final SlotOwner owner; - - /** The parent of this slot in the hierarchy, or null, if this is the parent */ - @Nullable - private final SharedSlot parent; - - /** The id of the group that this slot is allocated to. May be null. */ - @Nullable - private final AbstractID groupID; - - private final int slotNumber; - - /** The state of the vertex, only atomically updated */ - private volatile int status = ALLOCATED_AND_ALIVE; - - // -------------------------------------------------------------------------------------------- - - /** - * Base constructor for slots. - * - * <p>This is the old way of constructing slots by the legacy code - * - * @param owner The component from which this slot is allocated. - * @param location The location info of the TaskManager where the slot was allocated from - * @param slotNumber The number of this slot. - * @param taskManagerGateway The actor gateway to communicate with the TaskManager - * @param parent The parent slot that contains this slot. May be null, if this slot is the root. - * @param groupID The ID that identifies the task group for which this slot is allocated. May be null - * if the slot does not belong to any task group. - */ - protected Slot( - SlotOwner owner, - TaskManagerLocation location, - int slotNumber, - TaskManagerGateway taskManagerGateway, - @Nullable SharedSlot parent, - @Nullable AbstractID groupID) { - - checkArgument(slotNumber >= 0); - - // create a simple slot context - this.slotContext = new SimpleSlotContext( - NO_ALLOCATION_ID, - location, - slotNumber, - taskManagerGateway); - - this.owner = checkNotNull(owner); - this.parent = parent; // may be null - this.groupID = groupID; // may be null - this.slotNumber = slotNumber; - } - - /** - * Base constructor for slots. - * - * @param slotContext The slot context of this slot. - * @param owner The component from which this slot is allocated. - * @param slotNumber The number of this slot. - * @param parent The parent slot that contains this slot. May be null, if this slot is the root. - * @param groupID The ID that identifies the task group for which this slot is allocated. May be null - * if the slot does not belong to any task group. - */ - protected Slot( - SlotContext slotContext, - SlotOwner owner, - int slotNumber, - @Nullable SharedSlot parent, - @Nullable AbstractID groupID) { - - this.slotContext = checkNotNull(slotContext); - this.owner = checkNotNull(owner); - this.parent = parent; // may be null - this.groupID = groupID; // may be null - this.slotNumber = slotNumber; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Gets the allocated slot that this slot refers to. - * - * @return This slot's allocated slot. - */ - public SlotContext getSlotContext() { - return slotContext; - } - - /** - * Gets the ID of the TaskManager that offers this slot. - * - * @return The ID of the TaskManager that offers this slot - */ - public ResourceID getTaskManagerID() { - return slotContext.getTaskManagerLocation().getResourceID(); - } - - /** - * Gets the location info of the TaskManager that offers this slot. - * - * @return The location info of the TaskManager that offers this slot - */ - public TaskManagerLocation getTaskManagerLocation() { - return slotContext.getTaskManagerLocation(); - } - - /** - * Gets the actor gateway that can be used to send messages to the TaskManager. - * - * <p>This method should be removed once the new interface-based RPC abstraction is in place - * - * @return The actor gateway that can be used to send messages to the TaskManager. - */ - public TaskManagerGateway getTaskManagerGateway() { - return slotContext.getTaskManagerGateway(); - } - - /** - * Gets the owner of this slot. The owner is the component that the slot was created from - * and to which it needs to be returned after the executed tasks are done. - * - * @return The owner of this slot. - */ - public SlotOwner getOwner() { - return owner; - } - - /** - * Gets the number of the slot. For a simple slot, that is the number of the slot - * on its instance. For a non-root slot, this returns the number of the slot in the - * list amongst its siblings in the tree. - * - * @return The number of the slot on the instance or amongst its siblings that share the same slot. - */ - public int getSlotNumber() { - return slotNumber; - } - - /** - * Gets the number of the root slot. This code behaves equal to {@code getRoot().getSlotNumber()}. - * If this slot is the root of the tree of shared slots, then this method returns the same - * value as {@link #getSlotNumber()}. - * - * @return The slot number of the root slot. - */ - public int getRootSlotNumber() { - if (parent == null) { - return slotNumber; - } else { - return parent.getRootSlotNumber(); - } - } - - /** - * Gets the ID that identifies the logical group to which this slot belongs: - * <ul> - * <li>If the slot does not belong to any group in particular, this field is null.</li> - * <li>If this slot was allocated as a sub-slot of a - * {@link org.apache.flink.runtime.instance.SlotSharingGroupAssignment}, - * then this ID will be the JobVertexID of the vertex whose task the slot - * holds in its shared slot.</li> - * <li>In case that the slot represents the shared slot of a co-location constraint, this ID will be the - * ID of the co-location constraint.</li> - * </ul> - * - * @return The ID identifying the logical group of slots. - */ - @Nullable - public AbstractID getGroupID() { - return groupID; - } - - /** - * Gets the parent slot of this slot. Returns null, if this slot has no parent. - * - * @return The parent slot, or null, if no this slot has no parent. - */ - @Nullable - public SharedSlot getParent() { - return parent; - } - - /** - * Gets the root slot of the tree containing this slot. If this slot is the root, - * the method returns this slot directly, otherwise it recursively goes to the parent until - * it reaches the root. - * - * @return The root slot of the tree containing this slot - */ - public Slot getRoot() { - if (parent == null) { - return this; - } else { - return parent.getRoot(); - } - } - - /** - * Gets the number of simple slots that are at the leaves of the tree of slots. - * - * @return The number of simple slots at the leaves. - */ - public abstract int getNumberLeaves(); - - // -------------------------------------------------------------------------------------------- - // Status and life cycle - // -------------------------------------------------------------------------------------------- - - /** - * Checks of the slot is still alive, i.e. in state {@link #ALLOCATED_AND_ALIVE}. - * - * @return True if the slot is alive, false otherwise. - */ - public boolean isAlive() { - return status == ALLOCATED_AND_ALIVE; - } - - /** - * Checks of the slot has been cancelled. Note that a released slot is also cancelled. - * - * @return True if the slot is cancelled or released, false otherwise. - */ - public boolean isCanceled() { - return status != ALLOCATED_AND_ALIVE; - } - - /** - * Checks of the slot has been released. - * - * @return True if the slot is released, false otherwise. - */ - public boolean isReleased() { - return status == RELEASED; - } - - /** - * Atomically marks the slot as cancelled, if it was alive before. - * - * @return True, if the state change was successful, false otherwise. - */ - final boolean markCancelled() { - return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED); - } - - /** - * Atomically marks the slot as released, if it was cancelled before. - * - * @return True, if the state change was successful, false otherwise. - */ - final boolean markReleased() { - return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED); - } - - /** - * This method cancels and releases the slot and all its sub-slots. - * - * After this method completed successfully, the slot will be in state "released", and the - * {@link #isReleased()} method will return {@code true}. - * - * If this slot is a simple slot, it will be returned to its instance. If it is a shared slot, - * it will release all of its sub-slots and release itself. - */ - public abstract CompletableFuture<?> releaseSlot(@Nullable Throwable cause); - - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * Slots must always has based on reference identity. - */ - @Override - public final int hashCode() { - return super.hashCode(); - } - - /** - * Slots must always compare on referential equality. - */ - @Override - public final boolean equals(Object obj) { - return this == obj; - } - - @Override - public String toString() { - return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status); - } - - protected String hierarchy() { - return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')'; - } - - private static String getStateName(int state) { - switch (state) { - case ALLOCATED_AND_ALIVE: - return "ALLOCATED/ALIVE"; - case CANCELLED: - return "CANCELLED"; - case RELEASED: - return "RELEASED"; - default: - return "(unknown)"; - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java deleted file mode 100644 index d16c332..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ /dev/null @@ -1,664 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; -import org.apache.flink.util.FlinkException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - - -/** - * The SlotSharingGroupAssignment manages a set of shared slots, which are shared between - * tasks of a {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}. - * - * <p>The assignments shares tasks by allowing a shared slot to hold one vertex per - * JobVertexID. For example, consider a program consisting of job vertices "source", "map", - * "reduce", and "sink". If the slot sharing group spans all four job vertices, then - * each shared slot can hold one parallel subtask of the source, the map, the reduce, and the - * sink vertex. Each shared slot holds the actual subtasks in child slots, which are (at the leaf level), - * the {@link SimpleSlot}s.</p> - * - * <p>An exception are the co-location-constraints, that define that the i-th subtask of one - * vertex needs to be scheduled strictly together with the i-th subtasks of the vertices - * that share the co-location-constraint. To manage that, a co-location-constraint gets its - * own shared slot inside the shared slots of a sharing group.</p> - * - * <p>Consider a job set up like this:</p> - * - * <pre>{@code - * +-------------- Slot Sharing Group --------------+ - * | | - * | +-- Co Location Group --+ | - * | | | | - * | (source) ---> (head) ---> (tail) ---> (sink) | - * | | | | - * | +-----------------------+ | - * +------------------------------------------------+ - * }</pre> - * - * <p>The slot hierarchy in the slot sharing group will look like the following</p> - * - * <pre> - * Shared(0)(root) - * | - * +-- Simple(2)(sink) - * | - * +-- Shared(1)(co-location-group) - * | | - * | +-- Simple(0)(tail) - * | +-- Simple(1)(head) - * | - * +-- Simple(0)(source) - * </pre> - */ -public class SlotSharingGroupAssignment { - - private final static Logger LOG = LoggerFactory.getLogger(SlotSharingGroupAssignment.class); - - /** The lock globally guards against concurrent modifications in the data structures */ - private final Object lock = new Object(); - - /** All slots currently allocated to this sharing group */ - private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); - - /** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */ - private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>(); - - - // -------------------------------------------------------------------------------------------- - // Accounting - // -------------------------------------------------------------------------------------------- - - /** - * Gets the number of slots that are currently governed by this assignment group. - * This refers to the slots allocated from an Instance, - * and not the sub-slots given out as children of those shared slots. - * - * @return The number of resource slots managed by this assignment group. - */ - public int getNumberOfSlots() { - return allSlots.size(); - } - - /** - * Gets the number of shared slots into which the given group can place subtasks or - * nested task groups. - * - * @param groupId The ID of the group. - * @return The number of shared slots available to the given job vertex. - */ - public int getNumberOfAvailableSlotsForGroup(AbstractID groupId) { - synchronized (lock) { - Map<ResourceID, List<SharedSlot>> available = availableSlotsPerJid.get(groupId); - - if (available != null) { - Set<SharedSlot> set = new HashSet<SharedSlot>(); - - for (List<SharedSlot> list : available.values()) { - for (SharedSlot slot : list) { - set.add(slot); - } - } - - return set.size(); - } - else { - // if no entry exists for a JobVertexID so far, then the vertex with that ID can - // add a subtask into each shared slot of this group. Consequently, all - // of them are available for that JobVertexID. - return allSlots.size(); - } - } - } - - // ------------------------------------------------------------------------ - // Slot allocation - // ------------------------------------------------------------------------ - - public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, JobVertexID groupId) { - return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupId, null); - } - - public SimpleSlot addSharedSlotAndAllocateSubSlot( - SharedSlot sharedSlot, Locality locality, CoLocationConstraint constraint) - { - return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, null, constraint); - } - - private SimpleSlot addSharedSlotAndAllocateSubSlot( - SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) { - - // sanity checks - if (!sharedSlot.isRootAndEmpty()) { - throw new IllegalArgumentException("The given slot is not an empty root slot."); - } - - final ResourceID location = sharedSlot.getTaskManagerID(); - - synchronized (lock) { - // early out in case that the slot died (instance disappeared) - if (!sharedSlot.isAlive()) { - return null; - } - - // add to the total bookkeeping - if (!allSlots.add(sharedSlot)) { - throw new IllegalArgumentException("Slot was already contained in the assignment group"); - } - - SimpleSlot subSlot; - AbstractID groupIdForMap; - - if (constraint == null) { - // allocate us a sub slot to return - subSlot = sharedSlot.allocateSubSlot(groupId); - groupIdForMap = groupId; - } - else { - // sanity check - if (constraint.isAssignedAndAlive()) { - throw new IllegalStateException( - "Trying to add a shared slot to a co-location constraint that has a life slot."); - } - - // we need a co-location slot --> a SimpleSlot nested in a SharedSlot to - // host other co-located tasks - SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(constraint.getGroupId()); - groupIdForMap = constraint.getGroupId(); - - if (constraintGroupSlot != null) { - // the sub-slots in the co-location constraint slot have no own group IDs - subSlot = constraintGroupSlot.allocateSubSlot(null); - if (subSlot != null) { - // all went well, we can give the constraint its slot - constraint.setSharedSlot(constraintGroupSlot); - - // NOTE: Do not lock the location constraint, because we don't yet know whether we will - // take the slot here - } - else { - // if we could not create a sub slot, release the co-location slot - // note that this does implicitly release the slot we have just added - // as well, because we release its last child slot. That is expected - // and desired. - constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot in this shared slot.")); - } - } - else { - // this should not happen, as we are under the lock that also - // guards slot disposals. Keep the check to be on the safe side - subSlot = null; - } - } - - if (subSlot != null) { - // preserve the locality information - subSlot.setLocality(locality); - - // let the other groups know that this slot exists and that they - // can place a task into this slot. - boolean entryForNewJidExists = false; - - for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) { - // there is already an entry for this groupID - if (entry.getKey().equals(groupIdForMap)) { - entryForNewJidExists = true; - continue; - } - - Map<ResourceID, List<SharedSlot>> available = entry.getValue(); - putIntoMultiMap(available, location, sharedSlot); - } - - // make sure an empty entry exists for this group, if no other entry exists - if (!entryForNewJidExists) { - availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>()); - } - - return subSlot; - } - else { - // if sharedSlot is releases, abort. - // This should be a rare case, since this method is called with a fresh slot. - return null; - } - } - // end synchronized (lock) - } - - /** - * Gets a slot suitable for the given task vertex. This method will prefer slots that are local - * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local - * slots if no local slot is available. The method returns null, when this sharing group has - * no slot available for the given JobVertexID. - * - * @param vertexID the vertex id - * @param locationPreferences location preferences - * - * @return A slot to execute the given ExecutionVertex in, or null, if none is available. - */ - public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) { - synchronized (lock) { - Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(vertexID, locationPreferences, false); - - if (p != null) { - SharedSlot ss = p.f0; - SimpleSlot slot = ss.allocateSubSlot(vertexID); - slot.setLocality(p.f1); - return slot; - } - else { - return null; - } - } - } - - /** - * Gets a slot for a task that has a co-location constraint. This method tries to grab - * a slot form the location-constraint's shared slot. If that slot has not been initialized, - * then the method tries to grab another slot that is available for the location-constraint-group. - * - * <p>In cases where the co-location constraint has not yet been initialized with a slot, - * or where that slot has been disposed in the meantime, this method tries to allocate a shared - * slot for the co-location constraint (inside on of the other available slots).</p> - * - * <p>If a suitable shared slot is available, this method allocates a simple slot within that - * shared slot and returns it. If no suitable shared slot could be found, this method - * returns null.</p> - * - * @param constraint The co-location constraint for the placement of the execution vertex. - * @param locationPreferences location preferences - * - * @return A simple slot allocate within a suitable shared slot, or {@code null}, if no suitable - * shared slot is available. - */ - public SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) { - synchronized (lock) { - if (constraint.isAssignedAndAlive()) { - // the shared slot of the co-location group is initialized and set we allocate a sub-slot - final SharedSlot shared = constraint.getSharedSlot(); - SimpleSlot subslot = shared.allocateSubSlot(null); - subslot.setLocality(Locality.LOCAL); - return subslot; - } - else if (constraint.isAssigned()) { - // we had an assignment before. - - SharedSlot previous = constraint.getSharedSlot(); - if (previous == null) { - throw new IllegalStateException("Bug: Found assigned co-location constraint without a slot."); - } - - TaskManagerLocation location = previous.getTaskManagerLocation(); - Tuple2<SharedSlot, Locality> p = getSharedSlotForTask( - constraint.getGroupId(), Collections.singleton(location), true); - - if (p == null) { - return null; - } - else { - SharedSlot newSharedSlot = p.f0; - - // allocate the co-location group slot inside the shared slot - SharedSlot constraintGroupSlot = newSharedSlot.allocateSharedSlot(constraint.getGroupId()); - if (constraintGroupSlot != null) { - constraint.setSharedSlot(constraintGroupSlot); - - // the sub slots in the co location constraint slot have no group that they belong to - // (other than the co-location-constraint slot) - SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null); - subSlot.setLocality(Locality.LOCAL); - return subSlot; - } - else { - // could not allocate the co-location-constraint shared slot - return null; - } - } - } - else { - // the location constraint has not been associated with a shared slot, yet. - // grab a new slot and initialize the constraint with that one. - // preferred locations are defined by the vertex - Tuple2<SharedSlot, Locality> p = - getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false); - if (p == null) { - // could not get a shared slot for this co-location-group - return null; - } - else { - final SharedSlot availableShared = p.f0; - final Locality l = p.f1; - - // allocate the co-location group slot inside the shared slot - SharedSlot constraintGroupSlot = availableShared.allocateSharedSlot(constraint.getGroupId()); - - // IMPORTANT: We do not lock the location, yet, since we cannot be sure that the - // caller really sticks with the slot we picked! - constraint.setSharedSlot(constraintGroupSlot); - - // the sub slots in the co location constraint slot have no group that they belong to - // (other than the co-location-constraint slot) - SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null); - sub.setLocality(l); - return sub; - } - } - } - } - - - public Tuple2<SharedSlot, Locality> getSharedSlotForTask( - AbstractID groupId, - Iterable<TaskManagerLocation> preferredLocations, - boolean localOnly) { - // check if there is anything at all in this group assignment - if (allSlots.isEmpty()) { - return null; - } - - // get the available slots for the group - Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId); - - if (slotsForGroup == null) { - // we have a new group, so all slots are available - slotsForGroup = new LinkedHashMap<>(); - availableSlotsPerJid.put(groupId, slotsForGroup); - - for (SharedSlot availableSlot : allSlots) { - putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot); - } - } - else if (slotsForGroup.isEmpty()) { - // the group exists, but nothing is available for that group - return null; - } - - // check whether we can schedule the task to a preferred location - boolean didNotGetPreferred = false; - - if (preferredLocations != null) { - for (TaskManagerLocation location : preferredLocations) { - - // set the flag that we failed a preferred location. If one will be found, - // we return early anyways and skip the flag evaluation - didNotGetPreferred = true; - - SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID()); - if (slot != null && slot.isAlive()) { - return new Tuple2<>(slot, Locality.LOCAL); - } - } - } - - // if we want only local assignments, exit now with a "not found" result - if (didNotGetPreferred && localOnly) { - return null; - } - - Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED; - - // schedule the task to any available location - SharedSlot slot; - while ((slot = pollFromMultiMap(slotsForGroup)) != null) { - if (slot.isAlive()) { - return new Tuple2<>(slot, locality); - } - } - - // nothing available after all, all slots were dead - return null; - } - - // ------------------------------------------------------------------------ - // Slot releasing - // ------------------------------------------------------------------------ - - /** - * Releases the simple slot from the assignment group. - * - * @param simpleSlot The SimpleSlot to be released - */ - void releaseSimpleSlot(SimpleSlot simpleSlot) { - synchronized (lock) { - // try to transition to the CANCELED state. That state marks - // that the releasing is in progress - if (simpleSlot.markCancelled()) { - - // sanity checks - if (simpleSlot.isAlive()) { - throw new IllegalStateException("slot is still alive"); - } - - // check whether the slot is already released - if (simpleSlot.markReleased()) { - LOG.debug("Release simple slot {}.", simpleSlot); - - AbstractID groupID = simpleSlot.getGroupID(); - SharedSlot parent = simpleSlot.getParent(); - - // if we have a group ID, then our parent slot is tracked here - if (groupID != null && !allSlots.contains(parent)) { - throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); - } - - int parentRemaining = parent.removeDisposedChildSlot(simpleSlot); - - if (parentRemaining > 0) { - // the parent shared slot is still alive. make sure we make it - // available again to the group of the just released slot - - if (groupID != null) { - // if we have a group ID, then our parent becomes available - // for that group again. otherwise, the slot is part of a - // co-location group and nothing becomes immediately available - - Map<ResourceID, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID); - - // sanity check - if (slotsForJid == null) { - throw new IllegalStateException("Trying to return a slot for group " + groupID + - " when available slots indicated that all slots were available."); - } - - putIntoMultiMap(slotsForJid, parent.getTaskManagerID(), parent); - } - } else { - // the parent shared slot is now empty and can be released - parent.markCancelled(); - internalDisposeEmptySharedSlot(parent); - } - } - } - } - } - - /** - * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}. - * - * @param sharedSlot The slot to be released. - */ - void releaseSharedSlot(SharedSlot sharedSlot) { - synchronized (lock) { - if (sharedSlot.markCancelled()) { - // we are releasing this slot - - if (sharedSlot.hasChildren()) { - final FlinkException cause = new FlinkException("Releasing shared slot parent."); - // by simply releasing all children, we should eventually release this slot. - Set<Slot> children = sharedSlot.getSubSlots(); - while (children.size() > 0) { - children.iterator().next().releaseSlot(cause); - } - } - else { - // if there are no children that trigger the release, we trigger it directly - internalDisposeEmptySharedSlot(sharedSlot); - } - } - } - } - - /** - * - * <p><b>NOTE: This method must be called from within a scope that holds the lock.</b></p> - */ - private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) { - // sanity check - if (sharedSlot.isAlive() || !sharedSlot.getSubSlots().isEmpty()) { - throw new IllegalArgumentException(); - } - - final SharedSlot parent = sharedSlot.getParent(); - final AbstractID groupID = sharedSlot.getGroupID(); - - // 1) If we do not have a parent, we are a root slot. - // 2) If we are not a root slot, we are a slot with a groupID and our parent - // becomes available for that group - - if (parent == null) { - // root slot, return to the instance. - sharedSlot.getOwner().returnLogicalSlot(sharedSlot); - - // also, make sure we remove this slot from everywhere - allSlots.remove(sharedSlot); - removeSlotFromAllEntries(availableSlotsPerJid, sharedSlot); - } - else if (groupID != null) { - // we remove ourselves from our parent slot - - if (sharedSlot.markReleased()) { - LOG.debug("Internally dispose empty shared slot {}.", sharedSlot); - - int parentRemaining = parent.removeDisposedChildSlot(sharedSlot); - - if (parentRemaining > 0) { - // the parent becomes available for the group again - Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupID); - - // sanity check - if (slotsForGroup == null) { - throw new IllegalStateException("Trying to return a slot for group " + groupID + - " when available slots indicated that all slots were available."); - } - - putIntoMultiMap(slotsForGroup, parent.getTaskManagerID(), parent); - - } - else { - // this was the last child of the parent. release the parent. - parent.markCancelled(); - internalDisposeEmptySharedSlot(parent); - } - } - } - else { - throw new IllegalStateException( - "Found a shared slot that is neither a root slot, nor associated with a vertex group."); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static void putIntoMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location, SharedSlot slot) { - List<SharedSlot> slotsForInstance = map.get(location); - if (slotsForInstance == null) { - slotsForInstance = new ArrayList<SharedSlot>(); - map.put(location, slotsForInstance); - } - slotsForInstance.add(slot); - } - - private static SharedSlot removeFromMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location) { - List<SharedSlot> slotsForLocation = map.get(location); - - if (slotsForLocation == null) { - return null; - } - else { - SharedSlot slot = slotsForLocation.remove(slotsForLocation.size() - 1); - if (slotsForLocation.isEmpty()) { - map.remove(location); - } - - return slot; - } - } - - private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) { - Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator(); - - while (iter.hasNext()) { - List<SharedSlot> slots = iter.next().getValue(); - - if (slots.isEmpty()) { - iter.remove(); - } - else if (slots.size() == 1) { - SharedSlot slot = slots.remove(0); - iter.remove(); - return slot; - } - else { - return slots.remove(slots.size() - 1); - } - } - - return null; - } - - private static void removeSlotFromAllEntries( - Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot) - { - final ResourceID taskManagerId = slot.getTaskManagerID(); - - for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) { - Map<ResourceID, List<SharedSlot>> map = entry.getValue(); - - List<SharedSlot> list = map.get(taskManagerId); - if (list != null) { - list.remove(slot); - if (list.isEmpty()) { - map.remove(taskManagerId); - } - } - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java index 0e9f585..df8e9f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java @@ -18,18 +18,14 @@ package org.apache.flink.runtime.jobmanager.scheduler; -import org.apache.flink.runtime.instance.SharedSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.AbstractID; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; -import java.util.Objects; - import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -45,8 +41,6 @@ public class CoLocationConstraint { private final CoLocationGroup group; - private volatile SharedSlot sharedSlot; - private volatile TaskManagerLocation lockedLocation; private volatile SlotRequestId slotRequestId; @@ -62,15 +56,6 @@ public class CoLocationConstraint { // ------------------------------------------------------------------------ /** - * Gets the shared slot into which this constraint's tasks are places. - * - * @return The shared slot into which this constraint's tasks are places. - */ - public SharedSlot getSharedSlot() { - return sharedSlot; - } - - /** * Gets the ID that identifies the co-location group. * * @return The ID that identifies the co-location group. @@ -81,9 +66,8 @@ public class CoLocationConstraint { /** * Checks whether the location of this constraint has been assigned. - * The location is assigned once a slot has been set, via the - * {@link #setSharedSlot(org.apache.flink.runtime.instance.SharedSlot)} method, - * and the location is locked via the {@link #lockLocation()} method. + * The location is locked via the {@link #lockLocation(TaskManagerLocation)} + * method. * * @return True if the location has been assigned, false otherwise. */ @@ -92,23 +76,9 @@ public class CoLocationConstraint { } /** - * Checks whether the location of this constraint has been assigned - * (as defined in the {@link #isAssigned()} method, and the current - * shared slot is alive. - * - * @return True if the location has been assigned and the shared slot is alive, - * false otherwise. - * @deprecated Should only be called by legacy code - */ - @Deprecated - public boolean isAssignedAndAlive() { - return lockedLocation != null && sharedSlot != null && sharedSlot.isAlive(); - } - - /** * Gets the location assigned to this slot. This method only succeeds after - * the location has been locked via the {@link #lockLocation()} method and - * {@link #isAssigned()} returns true. + * the location has been locked via the {@link #lockLocation(TaskManagerLocation)} + * method and {@link #isAssigned()} returns true. * * @return The instance describing the location for the tasks of this constraint. * @throws IllegalStateException Thrown if the location has not been assigned, yet. @@ -126,51 +96,6 @@ public class CoLocationConstraint { // ------------------------------------------------------------------------ /** - * Assigns a new shared slot to this co-location constraint. The shared slot - * will hold the subtasks that are executed under this co-location constraint. - * If the constraint's location is assigned, then this slot needs to be from - * the same location (instance) as the one assigned to this constraint. - * - * <p>If the constraint already has a slot, the current one will be released.</p> - * - * @param newSlot The new shared slot to assign to this constraint. - * @throws IllegalArgumentException If the constraint's location has been assigned and - * the new slot is from a different location. - */ - public void setSharedSlot(SharedSlot newSlot) { - checkNotNull(newSlot); - - if (this.sharedSlot == null) { - this.sharedSlot = newSlot; - } - else if (newSlot != this.sharedSlot){ - if (lockedLocation != null && !Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) { - throw new IllegalArgumentException( - "Cannot assign different location to a constraint whose location is locked."); - } - if (this.sharedSlot.isAlive()) { - this.sharedSlot.releaseSlot(new FlinkException("Setting new shared slot for co-location constraint.")); - } - - this.sharedSlot = newSlot; - } - } - - /** - * Locks the location of this slot. The location can be locked only once - * and only after a shared slot has been assigned. - * - * @throws IllegalStateException Thrown, if the location is already locked, - * or is no slot has been set, yet. - */ - public void lockLocation() throws IllegalStateException { - checkState(lockedLocation == null, "Location is already locked"); - checkState(sharedSlot != null, "Cannot lock location without a slot."); - - lockedLocation = sharedSlot.getTaskManagerLocation(); - } - - /** * Locks the location of this slot. The location can be locked only once * and only after a shared slot has been assigned. * @@ -209,9 +134,13 @@ public class CoLocationConstraint { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - + @Override public String toString() { - return "CoLocation constraint id " + getGroupId() + " shared slot " + sharedSlot; + return "CoLocationConstraint{" + + "group=" + group + + ", lockedLocation=" + lockedLocation + + ", slotRequestId=" + slotRequestId + + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java index ef8bd67..c8bc9b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java @@ -119,12 +119,6 @@ public class CoLocationGroup implements java.io.Serializable { * executed any more.</p> */ public void resetConstraints() { - for (CoLocationConstraint c : this.constraints) { - if (c.isAssignedAndAlive()) { - throw new IllegalStateException( - "Cannot reset co-location group: some constraints still have live tasks"); - } - } this.constraints.clear(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java index 86be9d4..c93456e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Set; import java.util.TreeSet; -import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -34,13 +33,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; public class SlotSharingGroup implements java.io.Serializable { private static final long serialVersionUID = 1L; - private final Set<JobVertexID> ids = new TreeSet<JobVertexID>(); - - /** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */ - private transient SlotSharingGroupAssignment taskAssignment; - private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); public SlotSharingGroup() {} @@ -68,23 +62,6 @@ public class SlotSharingGroup implements java.io.Serializable { public SlotSharingGroupId getSlotSharingGroupId() { return slotSharingGroupId; } - - public SlotSharingGroupAssignment getTaskAssignment() { - if (this.taskAssignment == null) { - this.taskAssignment = new SlotSharingGroupAssignment(); - } - - return this.taskAssignment; - } - - public void clearTaskAssignment() { - if (this.taskAssignment != null) { - if (this.taskAssignment.getNumberOfSlots() > 0) { - throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources."); - } - } - this.taskAssignment = null; - } // ------------------------------------------------------------------------ // Utilities diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 13f74f4..ebef29e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -56,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; @@ -105,7 +103,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; /** * Tests for {@link ExecutionGraph} deployment. @@ -630,13 +627,11 @@ public class ExecutionGraphDeploymentTest extends TestLogger { final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); - final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0); + final LogicalSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0); + final LogicalSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1); - final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1); - - final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0); - - final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1); + final LogicalSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0); + final LogicalSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1); slotFutures.get(sourceVertexId)[0].complete(sourceSlot1); slotFutures.get(sourceVertexId)[1].complete(sourceSlot2); @@ -751,12 +746,11 @@ public class ExecutionGraphDeploymentTest extends TestLogger { } } - private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) { - return new SimpleSlot( - mock(SlotOwner.class), - taskManagerLocation, - index, - new SimpleAckingTaskManagerGateway()); + private LogicalSlot createSlot(TaskManagerLocation taskManagerLocation, int index) { + return new TestingLogicalSlotBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setSlotNumber(index) + .createTestingLogicalSlot(); } private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 3850a70..802dff0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -76,8 +75,8 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; /** * Tests for the scheduling of the execution graph. This tests that @@ -139,8 +138,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final InteractionsCountingTaskManagerGateway gatewaySource = createTaskManager(); final InteractionsCountingTaskManagerGateway gatewayTarget = createTaskManager(); - final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId); - final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId); + final LogicalSlot sourceSlot = createTestingLogicalSlot(gatewaySource); + final LogicalSlot targetSlot = createTestingLogicalSlot(gatewayTarget); eg.scheduleForExecution(); @@ -164,6 +163,12 @@ public class ExecutionGraphSchedulingTest extends TestLogger { assertEquals(JobStatus.RUNNING, eg.getState()); } + private TestingLogicalSlot createTestingLogicalSlot(InteractionsCountingTaskManagerGateway gatewaySource) { + return new TestingLogicalSlotBuilder() + .setTaskManagerGateway(gatewaySource) + .createTestingLogicalSlot(); + } + /** * This test verifies that before deploying a pipelined connected component, the * full set of slots is available, and that not some tasks are deployed, and later the @@ -203,15 +208,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final InteractionsCountingTaskManagerGateway[] sourceTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism]; final InteractionsCountingTaskManagerGateway[] targetTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism]; - final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; - final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; + final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism]; + final LogicalSlot[] targetSlots = new LogicalSlot[parallelism]; for (int i = 0; i < parallelism; i++) { sourceTaskManagers[i] = createTaskManager(); targetTaskManagers[i] = createTaskManager(); - sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId); - targetSlots[i] = createSlot(targetTaskManagers[i], jobId); + sourceSlots[i] = createTestingLogicalSlot(sourceTaskManagers[i]); + targetSlots[i] = createTestingLogicalSlot(targetTaskManagers[i]); sourceFutures[i] = new CompletableFuture<>(); targetFutures[i] = new CompletableFuture<>(); @@ -298,8 +303,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { slotOwner.setReturnAllocatedSlotConsumer( (LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId())); - final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; - final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; + final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism]; + final LogicalSlot[] targetSlots = new LogicalSlot[parallelism]; @SuppressWarnings({"unchecked", "rawtypes"}) final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism]; @@ -307,8 +312,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism]; for (int i = 0; i < parallelism; i++) { - sourceSlots[i] = createSlot(taskManager, jobId, slotOwner); - targetSlots[i] = createSlot(taskManager, jobId, slotOwner); + sourceSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId()); + targetSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId()); sourceFutures[i] = new CompletableFuture<>(); targetFutures[i] = new CompletableFuture<>(); @@ -348,8 +353,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // all completed futures must have been returns for (int i = 0; i < parallelism; i += 2) { - assertTrue(sourceSlots[i].isCanceled()); - assertTrue(targetSlots[i].isCanceled()); + assertFalse(sourceSlots[i].isAlive()); + assertFalse(targetSlots[i].isAlive()); } } @@ -379,12 +384,12 @@ public class ExecutionGraphSchedulingTest extends TestLogger { (LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId())); final InteractionsCountingTaskManagerGateway taskManager = createTaskManager(); - final SimpleSlot[] slots = new SimpleSlot[parallelism]; + final LogicalSlot[] slots = new LogicalSlot[parallelism]; @SuppressWarnings({"unchecked", "rawtypes"}) final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism]; for (int i = 0; i < parallelism; i++) { - slots[i] = createSlot(taskManager, jobId, slotOwner); + slots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId()); slotFutures[i] = new CompletableFuture<>(); } @@ -478,7 +483,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); - final SimpleSlot slot = createSlot(new SimpleAckingTaskManagerGateway(), jobGraph.getJobID(), new DummySlotOwner()); + final LogicalSlot slot = createSingleLogicalSlot(new DummySlotOwner(), new SimpleAckingTaskManagerGateway(), new SlotRequestId()); slotProvider.addSlot(jobVertex.getID(), 0, CompletableFuture.completedFuture(slot)); final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>(); @@ -592,23 +597,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger { NoOpPartitionTracker.INSTANCE); } - private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) { - return createSlot(taskManager, jobId, new TestingSlotOwner()); - } - - private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) { - TaskManagerLocation location = new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); - - SimpleSlotContext slot = new SimpleSlotContext( - new AllocationID(), - location, - 0, - taskManager); - - return new SimpleSlot(slot, slotOwner, 0); - } - @Nonnull static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) { TaskManagerLocation location = new TaskManagerLocation( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index ab4048d..21bd002 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.PartitionTracker; @@ -51,6 +50,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.PartitionDescriptor; @@ -219,11 +219,12 @@ public class ExecutionPartitionLifecycleTest extends TestLogger { final SlotProvider slotProvider = new SlotProvider() { @Override public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { - return CompletableFuture.completedFuture(new SimpleSlot( - new SingleSlotTestingSlotOwner(), - taskManagerLocation, - 0, - taskManagerGateway)); + return CompletableFuture.completedFuture( + new TestingLogicalSlotBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskManagerGateway(taskManagerGateway) + .setSlotOwner(new SingleSlotTestingSlotOwner()) + .createTestingLogicalSlot()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 6df8366..3be8657 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; @@ -106,11 +106,7 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); - final SimpleSlot slot = new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway()); + final LogicalSlot slot = createTestingLogicalSlot(slotOwner); final LogicalSlot otherSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); @@ -132,6 +128,12 @@ public class ExecutionTest extends TestLogger { assertEquals(slot, slotOwner.getReturnedSlotFuture().get()); } + private TestingLogicalSlot createTestingLogicalSlot(SlotOwner slotOwner) { + return new TestingLogicalSlotBuilder() + .setSlotOwner(slotOwner) + .createTestingLogicalSlot(); + } + /** * Tests that the slot is released in case of a execution cancellation when having * a slot assigned and being in state SCHEDULED. @@ -143,11 +145,7 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); - final SimpleSlot slot = new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway()); + final LogicalSlot slot = createTestingLogicalSlot(slotOwner); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); @@ -193,11 +191,7 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); - final SimpleSlot slot = new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway()); + final LogicalSlot slot = createTestingLogicalSlot(slotOwner); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); @@ -563,13 +557,7 @@ public class ExecutionTest extends TestLogger { for (JobVertexID jobVertexId : jobVertexIds) { for (int i = 0; i < parallelism; i++) { - final SimpleSlot slot = new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway(), - null, - null); + final LogicalSlot slot = createTestingLogicalSlot(slotOwner); slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 3e95587..37e5183 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -37,9 +36,13 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -237,15 +240,22 @@ public class ExecutionVertexLocalityTest extends TestLogger { // - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted // - exposing test methods in the ExecutionVertex leads to undesirable setters - SlotContext slot = new SimpleSlotContext( + SlotContext slotContext = new SimpleSlotContext( new AllocationID(), location, 0, mock(TaskManagerGateway.class)); - SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0); - if (!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) { + + LogicalSlot slot = new SingleLogicalSlot( + new SlotRequestId(), + slotContext, + null, + Locality.LOCAL, + mock(SlotOwner.class)); + + if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) { throw new FlinkException("Could not assign resource."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java deleted file mode 100644 index dcf5c98..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java +++ /dev/null @@ -1,630 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.AbstractID; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the allocation, properties, and release of shared slots. - */ -public class SharedSlotsTest extends TestLogger { - - private static final Iterable<TaskManagerLocation> NO_LOCATION = Collections.emptySet(); - - @Test - public void allocateAndReleaseEmptySlot() { - try { - JobVertexID vertexId = new JobVertexID(); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - assertEquals(0, assignment.getNumberOfSlots()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vertexId)); - - // get a shared slot - SharedSlot slot = getSharedSlot(assignment); - - // check that the new slot is fresh - assertTrue(slot.isAlive()); - assertFalse(slot.isCanceled()); - assertFalse(slot.isReleased()); - assertEquals(0, slot.getNumberLeaves()); - assertFalse(slot.hasChildren()); - assertTrue(slot.isRootAndEmpty()); - assertNotNull(slot.toString()); - assertTrue(slot.getSubSlots().isEmpty()); - assertEquals(0, slot.getSlotNumber()); - assertEquals(0, slot.getRootSlotNumber()); - - // release the slot immediately. - slot.releaseSlot(null); - - assertTrue(slot.isCanceled()); - assertTrue(slot.isReleased()); - - assertEquals(0, assignment.getNumberOfSlots()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vertexId)); - - // we should not be able to allocate any children from this released slot - assertNull(slot.allocateSharedSlot(new AbstractID())); - assertNull(slot.allocateSubSlot(new AbstractID())); - - // we cannot add this slot to the assignment group - assertNull(assignment.addSharedSlotAndAllocateSubSlot(slot, Locality.NON_LOCAL, vertexId)); - assertEquals(0, assignment.getNumberOfSlots()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void allocateSimpleSlotsAndReleaseFromRoot() { - try { - JobVertexID vid1 = new JobVertexID(); - JobVertexID vid2 = new JobVertexID(); - JobVertexID vid3 = new JobVertexID(); - JobVertexID vid4 = new JobVertexID(); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3, vid4); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - // get a shared slot - SharedSlot sharedSlot = getSharedSlot(assignment); - - // allocate a series of sub slots - - SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, vid1); - assertNotNull(sub1); - - assertNull(sub1.getPayload()); - assertEquals(Locality.LOCAL, sub1.getLocality()); - assertEquals(1, sub1.getNumberLeaves()); - assertEquals(vid1, sub1.getGroupID()); - assertEquals(sharedSlot, sub1.getParent()); - assertEquals(sharedSlot, sub1.getRoot()); - assertEquals(0, sub1.getRootSlotNumber()); - assertEquals(0, sub1.getSlotNumber()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - - SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION); - assertNotNull(sub2); - - assertNull(sub2.getPayload()); - assertEquals(Locality.UNCONSTRAINED, sub2.getLocality()); - assertEquals(1, sub2.getNumberLeaves()); - assertEquals(vid2, sub2.getGroupID()); - assertEquals(sharedSlot, sub2.getParent()); - assertEquals(sharedSlot, sub2.getRoot()); - assertEquals(0, sub2.getRootSlotNumber()); - assertEquals(1, sub2.getSlotNumber()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - - SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(sub2.getTaskManagerLocation())); - assertNotNull(sub3); - - assertNull(sub3.getPayload()); - assertEquals(Locality.LOCAL, sub3.getLocality()); - assertEquals(1, sub3.getNumberLeaves()); - assertEquals(vid3, sub3.getGroupID()); - assertEquals(sharedSlot, sub3.getParent()); - assertEquals(sharedSlot, sub3.getRoot()); - assertEquals(0, sub3.getRootSlotNumber()); - assertEquals(2, sub3.getSlotNumber()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - - LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - SimpleSlot sub4 = assignment.getSlotForTask(vid4, - Collections.singleton(taskManagerLocation)); - assertNotNull(sub4); - - assertNull(sub4.getPayload()); - assertEquals(Locality.NON_LOCAL, sub4.getLocality()); - assertEquals(1, sub4.getNumberLeaves()); - assertEquals(vid4, sub4.getGroupID()); - assertEquals(sharedSlot, sub4.getParent()); - assertEquals(sharedSlot, sub4.getRoot()); - assertEquals(0, sub4.getRootSlotNumber()); - assertEquals(3, sub4.getSlotNumber()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - - // release from the root. - sharedSlot.releaseSlot(null); - - assertTrue(sharedSlot.isReleased()); - assertTrue(sub1.isReleased()); - assertTrue(sub2.isReleased()); - assertTrue(sub3.isReleased()); - assertTrue(sub4.isReleased()); - - assertEquals(0, sharedSlot.getNumberLeaves()); - assertFalse(sharedSlot.hasChildren()); - - assertEquals(0, assignment.getNumberOfSlots()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - - assertNull(sharedSlot.allocateSharedSlot(new AbstractID())); - assertNull(sharedSlot.allocateSubSlot(new AbstractID())); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void allocateSimpleSlotsAndReleaseFromLeaves() { - try { - JobVertexID vid1 = new JobVertexID(); - JobVertexID vid2 = new JobVertexID(); - JobVertexID vid3 = new JobVertexID(); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - // allocate a shared slot - SharedSlot sharedSlot = getSharedSlot(assignment); - - // allocate a series of sub slots - - SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid1); - SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION); - SimpleSlot sub3 = assignment.getSlotForTask(vid3, NO_LOCATION); - - assertNotNull(sub1); - assertNotNull(sub2); - assertNotNull(sub3); - - assertEquals(3, sharedSlot.getNumberLeaves()); - - assertEquals(1, assignment.getNumberOfSlots()); - - // release from the leaves. - - sub2.releaseSlot(null); - - assertTrue(sharedSlot.isAlive()); - assertTrue(sub1.isAlive()); - assertTrue(sub2.isReleased()); - assertTrue(sub3.isAlive()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfSlots()); - - assertEquals(2, sharedSlot.getNumberLeaves()); - - - sub1.releaseSlot(null); - - assertTrue(sharedSlot.isAlive()); - assertTrue(sub1.isReleased()); - assertTrue(sub2.isReleased()); - assertTrue(sub3.isAlive()); - - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfSlots()); - - assertEquals(1, sharedSlot.getNumberLeaves()); - - sub3.releaseSlot(null); - - assertTrue(sharedSlot.isReleased()); - assertTrue(sub1.isReleased()); - assertTrue(sub2.isReleased()); - assertTrue(sub3.isReleased()); - - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(0, assignment.getNumberOfSlots()); - - assertEquals(0, assignment.getNumberOfSlots()); - - assertNull(sharedSlot.allocateSharedSlot(new AbstractID())); - assertNull(sharedSlot.allocateSubSlot(new AbstractID())); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void allocateAndReleaseInMixedOrder() { - try { - JobVertexID vid1 = new JobVertexID(); - JobVertexID vid2 = new JobVertexID(); - JobVertexID vid3 = new JobVertexID(); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - // get a shared slot - SharedSlot sharedSlot = getSharedSlot(assignment); - - // allocate a series of sub slots - - SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid1); - SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION); - - assertNotNull(sub1); - assertNotNull(sub2); - - assertEquals(2, sharedSlot.getNumberLeaves()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfSlots()); - - - sub2.releaseSlot(null); - - assertEquals(1, sharedSlot.getNumberLeaves()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfSlots()); - - - SimpleSlot sub3 = assignment.getSlotForTask(vid3, NO_LOCATION); - assertNotNull(sub3); - - assertEquals(2, sharedSlot.getNumberLeaves()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(1, assignment.getNumberOfSlots()); - - sub3.releaseSlot(null); - sub1.releaseSlot(null); - - assertTrue(sharedSlot.isReleased()); - assertEquals(0, sharedSlot.getNumberLeaves()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2)); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); - assertEquals(0, assignment.getNumberOfSlots()); - - assertEquals(0, assignment.getNumberOfSlots()); - - assertNull(sharedSlot.allocateSharedSlot(new AbstractID())); - assertNull(sharedSlot.allocateSubSlot(new AbstractID())); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * We allocate and release the structure below, starting by allocating a simple slot in the - * shared slot and finishing by releasing a simple slot. - * - * <pre> - * Shared(0)(root) - * | - * +-- Simple(2)(sink) - * | - * +-- Shared(1)(co-location-group) - * | | - * | +-- Simple(0)(tail) - * | +-- Simple(1)(head) - * | - * +-- Simple(0)(source) - * </pre> - */ - @Test - public void testAllocateAndReleaseTwoLevels() { - try { - JobVertexID sourceId = new JobVertexID(); - JobVertexID headId = new JobVertexID(); - JobVertexID tailId = new JobVertexID(); - JobVertexID sinkId = new JobVertexID(); - - JobVertex headVertex = new JobVertex("head", headId); - JobVertex tailVertex = new JobVertex("tail", tailId); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - assertEquals(0, assignment.getNumberOfSlots()); - - CoLocationGroup coLocationGroup = new CoLocationGroup(headVertex, tailVertex); - CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0); - assertFalse(constraint.isAssigned()); - - // allocate a shared slot - SharedSlot sharedSlot = getSharedSlot(assignment); - - // get the first simple slot - SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId); - - assertEquals(1, sharedSlot.getNumberLeaves()); - - // get the first slot in the nested shared slot from the co-location constraint - SimpleSlot headSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet()); - assertEquals(2, sharedSlot.getNumberLeaves()); - - assertNotNull(constraint.getSharedSlot()); - assertTrue(constraint.getSharedSlot().isAlive()); - assertFalse(constraint.isAssigned()); - - // we do not immediately lock the location - headSlot.releaseSlot(); - assertEquals(1, sharedSlot.getNumberLeaves()); - - assertNotNull(constraint.getSharedSlot()); - assertTrue(constraint.getSharedSlot().isReleased()); - assertFalse(constraint.isAssigned()); - - // re-allocate the head slot - headSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet()); - - constraint.lockLocation(); - assertNotNull(constraint.getSharedSlot()); - assertTrue(constraint.isAssigned()); - assertTrue(constraint.isAssignedAndAlive()); - - SimpleSlot tailSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet()); - - assertEquals(constraint.getSharedSlot(), headSlot.getParent()); - assertEquals(constraint.getSharedSlot(), tailSlot.getParent()); - - SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, Collections.<TaskManagerLocation>emptySet()); - assertEquals(4, sharedSlot.getNumberLeaves()); - - // we release our co-location constraint tasks - headSlot.releaseSlot(null); - tailSlot.releaseSlot(null); - - assertEquals(2, sharedSlot.getNumberLeaves()); - assertTrue(headSlot.isReleased()); - assertTrue(tailSlot.isReleased()); - assertTrue(constraint.isAssigned()); - assertFalse(constraint.isAssignedAndAlive()); - - // we should have resources again for the co-location constraint - assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId())); - - // re-allocate head and tail from the constraint - headSlot = assignment.getSlotForTask(constraint, NO_LOCATION); - tailSlot = assignment.getSlotForTask(constraint, NO_LOCATION); - - assertEquals(4, sharedSlot.getNumberLeaves()); - assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId())); - - // verify some basic properties of the slots - assertEquals(sourceSlot.getTaskManagerID(), headSlot.getTaskManagerID()); - assertEquals(sourceSlot.getTaskManagerID(), tailSlot.getTaskManagerID()); - assertEquals(sourceSlot.getTaskManagerID(), sinkSlot.getTaskManagerID()); - - assertEquals(sourceId, sourceSlot.getGroupID()); - assertEquals(sinkId, sinkSlot.getGroupID()); - assertNull(headSlot.getGroupID()); - assertNull(tailSlot.getGroupID()); - assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID()); - - // release all - sourceSlot.releaseSlot(null); - headSlot.releaseSlot(null); - tailSlot.releaseSlot(null); - sinkSlot.releaseSlot(null); - - assertTrue(sharedSlot.isReleased()); - assertTrue(sourceSlot.isReleased()); - assertTrue(headSlot.isReleased()); - assertTrue(tailSlot.isReleased()); - assertTrue(sinkSlot.isReleased()); - assertTrue(constraint.getSharedSlot().isReleased()); - - assertTrue(constraint.isAssigned()); - assertFalse(constraint.isAssignedAndAlive()); - - assertEquals(0, assignment.getNumberOfSlots()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * We allocate and the structure below and release it from the root. - * - * <pre> - * Shared(0)(root) - * | - * +-- Simple(2)(sink) - * | - * +-- Shared(1)(co-location-group) - * | | - * | +-- Simple(0)(tail) - * | +-- Simple(1)(head) - * | - * +-- Simple(0)(source) - * </pre> - */ - @Test - public void testReleaseTwoLevelsFromRoot() { - try { - JobVertexID sourceId = new JobVertexID(); - JobVertexID headId = new JobVertexID(); - JobVertexID tailId = new JobVertexID(); - JobVertexID sinkId = new JobVertexID(); - - JobVertex headVertex = new JobVertex("head", headId); - JobVertex tailVertex = new JobVertex("tail", tailId); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - assertEquals(0, assignment.getNumberOfSlots()); - - CoLocationGroup coLocationGroup = new CoLocationGroup(headVertex, tailVertex); - CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0); - assertFalse(constraint.isAssigned()); - - // allocate a shared slot - SharedSlot sharedSlot = getSharedSlot(assignment); - - // get the first simple slot - SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId); - - SimpleSlot headSlot = assignment.getSlotForTask(constraint, NO_LOCATION); - constraint.lockLocation(); - SimpleSlot tailSlot = assignment.getSlotForTask(constraint, NO_LOCATION); - - SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, NO_LOCATION); - - assertEquals(4, sharedSlot.getNumberLeaves()); - - // release all - sourceSlot.releaseSlot(null); - headSlot.releaseSlot(null); - tailSlot.releaseSlot(null); - sinkSlot.releaseSlot(null); - - assertTrue(sharedSlot.isReleased()); - assertTrue(sourceSlot.isReleased()); - assertTrue(headSlot.isReleased()); - assertTrue(tailSlot.isReleased()); - assertTrue(sinkSlot.isReleased()); - assertTrue(constraint.getSharedSlot().isReleased()); - - assertTrue(constraint.isAssigned()); - assertFalse(constraint.isAssignedAndAlive()); - - assertEquals(0, assignment.getNumberOfSlots()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testImmediateReleaseOneLevel() { - try { - JobVertexID vid = new JobVertexID(); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vid); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - SharedSlot sharedSlot = getSharedSlot(assignment); - - SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid); - sub.releaseSlot(null); - - assertTrue(sub.isReleased()); - assertTrue(sharedSlot.isReleased()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testImmediateReleaseTwoLevel() { - try { - JobVertexID vid = new JobVertexID(); - JobVertex vertex = new JobVertex("vertex", vid); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(vid); - SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); - - CoLocationGroup coLocationGroup = new CoLocationGroup(vertex); - CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0); - - SharedSlot sharedSlot = getSharedSlot(assignment); - - SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint); - - assertNull(sub.getGroupID()); - assertEquals(constraint.getSharedSlot(), sub.getParent()); - - sub.releaseSlot(null); - - assertTrue(sub.isReleased()); - assertTrue(sharedSlot.isReleased()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - public SharedSlot getSharedSlot(SlotSharingGroupAssignment assignment) { - final TestingSlotOwner slotOwner = new TestingSlotOwner(); - slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot logicalSlot) -> ((SharedSlot) logicalSlot).markReleased()); - return new SharedSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway(), - assignment); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java deleted file mode 100644 index affe5cb..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingPayload; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class SimpleSlotTest extends TestLogger { - - @Test - public void testStateTransitions() { - try { - // release immediately - { - SimpleSlot slot = getSlot(); - assertTrue(slot.isAlive()); - - slot.releaseSlot(null); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertTrue(slot.isReleased()); - } - - // state transitions manually - { - SimpleSlot slot = getSlot(); - assertTrue(slot.isAlive()); - - slot.markCancelled(); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertFalse(slot.isReleased()); - - slot.markReleased(); - assertFalse(slot.isAlive()); - assertTrue(slot.isCanceled()); - assertTrue(slot.isReleased()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSetExecutionVertex() { - try { - TestingPayload payload1 = new TestingPayload(); - TestingPayload payload2 = new TestingPayload(); - - // assign to alive slot - { - SimpleSlot slot = getSlot(); - - assertTrue(slot.tryAssignPayload(payload1)); - assertEquals(payload1, slot.getPayload()); - - // try to add another one - assertFalse(slot.tryAssignPayload(payload2)); - assertEquals(payload1, slot.getPayload()); - } - - // assign to canceled slot - { - SimpleSlot slot = getSlot(); - assertTrue(slot.markCancelled()); - - assertFalse(slot.tryAssignPayload(payload1)); - assertNull(slot.getPayload()); - } - - // assign to released marked slot - { - SimpleSlot slot = getSlot(); - assertTrue(slot.markCancelled()); - assertTrue(slot.markReleased()); - - assertFalse(slot.tryAssignPayload(payload1)); - assertNull(slot.getPayload()); - } - - // assign to released - { - SimpleSlot slot = getSlot(); - slot.releaseSlot(null); - - assertFalse(slot.tryAssignPayload(payload1)); - assertNull(slot.getPayload()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - public static SimpleSlot getSlot() { - final TestingSlotOwner slotOwner = new TestingSlotOwner(); - slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot logicalSlot) -> ((SimpleSlot) logicalSlot).markReleased()); - return new SimpleSlot( - slotOwner, - new LocalTaskManagerLocation(), - 0, - new SimpleAckingTaskManagerGateway()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index eccccf7..cad0af5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -365,7 +365,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { s2.releaseSlot(); assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); LogicalSlot s3 = testingSlotProvider.allocateSlot( new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); @@ -412,7 +411,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { s2.releaseSlot(); assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); LogicalSlot sa = testingSlotProvider.allocateSlot( new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); @@ -487,10 +485,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { s3.releaseSlot(); s4.releaseSlot(); assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); - - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); } @Test @@ -529,10 +523,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase { s4.releaseSlot(); assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); - - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); - assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); } private static SlotProfile slotProfileForLocation(TaskManagerLocation location) {