Repository: flink Updated Branches: refs/heads/master 5783671c2 -> f6d866817
[FLINK-4373] [cluster management] Introduce AllocationID, ResourceProfile, and AllocatedSlot These classes are introduced as part of the cluster management rework. This closes #2630 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6d86681 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6d86681 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6d86681 Branch: refs/heads/master Commit: f6d8668175fb94f338037fd1ab40d2a2e344d097 Parents: 5783671 Author: Stephan Ewen <se...@apache.org> Authored: Thu Oct 13 18:50:18 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 11:33:35 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/types/AllocationID.java | 50 ++++++ .../clusterframework/types/ResourceProfile.java | 122 +++++++++++++++ .../flink/runtime/instance/SharedSlot.java | 53 ++++++- .../flink/runtime/instance/SimpleSlot.java | 57 ++++++- .../org/apache/flink/runtime/instance/Slot.java | 77 +++++++--- .../runtime/jobmanager/slots/AllocatedSlot.java | 152 +++++++++++++++++++ 6 files changed, 488 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java new file mode 100644 index 0000000..59d8f9d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.util.AbstractID; + +/** + * Unique identifier for a slot allocated by a JobManager from a TaskManager. + * Also identifies a pending allocation request, and is constant across retries. + * + * <p>This ID is used for all synchronization of the status of Slots from TaskManagers + * that are not free (i.e., have been allocated by a job). + */ +public class AllocationID extends AbstractID { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new random AllocationID. + */ + public AllocationID() { + super(); + } + + /** + * Constructs a new AllocationID with the given parts. + * + * @param lowerPart the lower bytes of the ID + * @param upperPart the higher bytes of the ID + */ + public AllocationID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java new file mode 100644 index 0000000..7a25de1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +/** + * Describe the resource profile of the slot, either when requiring or offering it. The profile can be + * checked whether it can match another profile's requirement, and furthermore we may calculate a matching + * score to decide which profile we should choose when we have lots of candidate slots. + */ +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1L); + + // ------------------------------------------------------------------------ + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; + + // ------------------------------------------------------------------------ + + /** + * Creates a new ResourceProfile. + * + * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores) + * @param memoryInMB The size of the memory, in megabytes. + */ + public ResourceProfile(double cpuCores, long memoryInMB) { + this.cpuCores = cpuCores; + this.memoryInMB = memoryInMB; + } + + /** + * Creates a copy of the given ResourceProfile. + * + * @param other The ResourceProfile to copy. + */ + public ResourceProfile(ResourceProfile other) { + this.cpuCores = other.cpuCores; + this.memoryInMB = other.memoryInMB; + } + + // ------------------------------------------------------------------------ + + /** + * Get the cpu cores needed + * @return The cpu cores, 1.0 means a full cpu thread + */ + public double getCpuCores() { + return cpuCores; + } + + /** + * Get the memory needed in MB + * @return The memory in MB + */ + public long getMemoryInMB() { + return memoryInMB; + } + + /** + * Check whether required resource profile can be matched + * + * @param required the required resource profile + * @return true if the requirement is matched, otherwise false + */ + public boolean isMatching(ResourceProfile required) { + return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB(); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + long cpuBits = Double.doubleToLongBits(cpuCores); + return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32)); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == ResourceProfile.class) { + ResourceProfile that = (ResourceProfile) obj; + return this.cpuCores == that.cpuCores && this.memoryInMB == that.memoryInMB; + } + else { + return false; + } + } + + @Override + public String toString() { + return "ResourceProfile{" + + "cpuCores=" + cpuCores + + ", memoryInMB=" + memoryInMB + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java index 7f05604..97385b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.instance; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.AbstractID; @@ -50,11 +51,14 @@ public class SharedSlot extends Slot { /** The set os sub-slots allocated from this shared slot */ private final Set<Slot> subSlots; + // ------------------------------------------------------------------------ + // Old Constructors (prior FLIP-6) + // ------------------------------------------------------------------------ /** * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. * This constructor is used to create a slot directly from an instance. - * + * * @param jobID The ID of the job that the slot is created for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from @@ -73,7 +77,7 @@ public class SharedSlot extends Slot { /** * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs * to the given task group. - * + * * @param jobID The ID of the job that the slot is created for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from @@ -96,6 +100,51 @@ public class SharedSlot extends Slot { } // ------------------------------------------------------------------------ + // Constructors + // ------------------------------------------------------------------------ + + /** + * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. + * This constructor is used to create a slot directly from an instance. + * + * @param allocatedSlot The allocated slot that this slot represents. + * @param owner The component from which this slot is allocated. + * @param assignmentGroup The assignment group that this shared slot belongs to. + */ + public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) { + this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), assignmentGroup, null, null); + } + + /** + * Creates a new shared slot that is a sub-slot of the given parent shared slot, and that belongs + * to the given task group. + * + * @param parent The parent slot of this slot. + * @param owner The component from which this slot is allocated. + * @param slotNumber The number of the slot. + * @param assignmentGroup The assignment group that this shared slot belongs to. + * @param groupId The assignment group of this slot. + */ + public SharedSlot( + SharedSlot parent, SlotOwner owner, int slotNumber, + SlotSharingGroupAssignment assignmentGroup, + AbstractID groupId) { + + this(parent.getAllocatedSlot(), owner, slotNumber, assignmentGroup, parent, groupId); + } + + private SharedSlot( + AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, + SlotSharingGroupAssignment assignmentGroup, + @Nullable SharedSlot parent, @Nullable AbstractID groupId) { + + super(allocatedSlot, owner, slotNumber, parent, groupId); + + this.assignmentGroup = checkNotNull(assignmentGroup); + this.subSlots = new HashSet<Slot>(); + } + + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index 615138f..479fa29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.instance; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.AbstractID; @@ -48,6 +50,9 @@ public class SimpleSlot extends Slot { /** The locality attached to the slot, defining whether the slot was allocated at the desired location. */ private volatile Locality locality = Locality.UNCONSTRAINED; + // ------------------------------------------------------------------------ + // Old Constructors (prior FLIP-6) + // ------------------------------------------------------------------------ /** * Creates a new simple slot that stands alone and does not belong to shared slot. @@ -66,7 +71,7 @@ public class SimpleSlot extends Slot { /** * Creates a new simple slot that belongs to the given shared slot and - * is identified by the given ID.. + * is identified by the given ID. * * @param jobID The ID of the job that the slot is allocated for. * @param owner The component from which this slot is allocated. @@ -80,7 +85,55 @@ public class SimpleSlot extends Slot { ActorGateway taskManagerActorGateway, @Nullable SharedSlot parent, @Nullable AbstractID groupID) { - super(jobID, owner, location, slotNumber, taskManagerActorGateway, parent, groupID); + super(parent != null ? + parent.getAllocatedSlot() : + new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber, + ResourceProfile.UNKNOWN, taskManagerActorGateway), + owner, slotNumber, parent, groupID); + } + + // ------------------------------------------------------------------------ + // Constructors + // ------------------------------------------------------------------------ + + /** + * Creates a new simple slot that stands alone and does not belong to shared slot. + * + * @param allocatedSlot The allocated slot that this slot represents. + * @param owner The component from which this slot is allocated. + * @param slotNumber The number of the task slot on the instance. + */ + public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber) { + this(allocatedSlot, owner, slotNumber, null, null); + } + + /** + * Creates a new simple slot that belongs to the given shared slot and + * is identified by the given ID.. + * + * @param parent The parent shared slot. + * @param owner The component from which this slot is allocated. + * @param slotNumber The number of the simple slot in its parent shared slot. + * @param groupID The ID that identifies the group that the slot belongs to. + */ + public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) { + this(parent.getAllocatedSlot(), owner, slotNumber, parent, groupID); + } + + /** + * Creates a new simple slot that belongs to the given shared slot and + * is identified by the given ID.. + * + * @param allocatedSlot The allocated slot that this slot represents. + * @param owner The component from which this slot is allocated. + * @param slotNumber The number of the simple slot in its parent shared slot. + * @param parent The parent shared slot. + * @param groupID The ID that identifies the group that the slot belongs to. + */ + private SimpleSlot( + AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, + @Nullable SharedSlot parent, @Nullable AbstractID groupID) { + super(allocatedSlot, owner, slotNumber, parent, groupID); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 451a9ec..b840b0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -18,7 +18,10 @@ package org.apache.flink.runtime.instance; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.AbstractID; @@ -31,8 +34,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Base class for task slots. TaskManagers offer one or more task slots, which define a slice of - * their resources. + * Base class for slots that the Scheduler / ExecutionGraph take from the SlotPool and use to place + * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a TaskManager's resources), + * plus additional fields to track what is currently executed in that slot, or if the slot is still + * used or disposed (ExecutionGraph gave it back to the pool). * * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). In the more complex * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. Shared slots may contain @@ -54,16 +59,13 @@ public abstract class Slot { /** State where all tasks in this slot have been canceled and the slot been given back to the instance */ private static final int RELEASED = 2; - // ------------------------------------------------------------------------ - - /** The ID of the job this slice belongs to. */ - private final JobID jobID; + // temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6) + protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0); - /** The location information of the TaskManager to which this slot belongs */ - private final TaskManagerLocation taskManagerLocation; + // ------------------------------------------------------------------------ - /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */ - private final ActorGateway taskManagerActorGateway; + /** The allocated slot that this slot represents. */ + private final AllocatedSlot allocatedSlot; /** The owner of this slot - the slot was taken from that owner and must be disposed to it */ private final SlotOwner owner; @@ -81,10 +83,14 @@ public abstract class Slot { /** The state of the vertex, only atomically updated */ private volatile int status = ALLOCATED_AND_ALIVE; - + + // -------------------------------------------------------------------------------------------- + /** * Base constructor for slots. * + * <p>This is the old way of constructing slots, prior to the FLIP-6 resource management refactoring. + * * @param jobID The ID of the job that this slot is allocated for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from @@ -101,23 +107,56 @@ public abstract class Slot { checkArgument(slotNumber >= 0); - this.jobID = checkNotNull(jobID); - this.taskManagerLocation = checkNotNull(location); + this.allocatedSlot = new AllocatedSlot( + NO_ALLOCATION_ID, jobID, location, slotNumber, ResourceProfile.UNKNOWN, taskManagerActorGateway); + + this.owner = checkNotNull(owner); + this.parent = parent; // may be null + this.groupID = groupID; // may be null + this.slotNumber = slotNumber; + } + + /** + * Base constructor for slots. + * + * @param allocatedSlot The allocated slot that this slot represents. + * @param owner The component from which this slot is allocated. + * @param slotNumber The number of this slot. + * @param parent The parent slot that contains this slot. May be null, if this slot is the root. + * @param groupID The ID that identifies the task group for which this slot is allocated. May be null + * if the slot does not belong to any task group. + */ + protected Slot( + AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, + @Nullable SharedSlot parent, @Nullable AbstractID groupID) { + + checkArgument(slotNumber >= 0); + + this.allocatedSlot = checkNotNull(allocatedSlot); this.owner = checkNotNull(owner); - this.taskManagerActorGateway = checkNotNull(taskManagerActorGateway); this.parent = parent; // may be null this.groupID = groupID; // may be null this.slotNumber = slotNumber; } + // -------------------------------------------------------------------------------------------- /** + * Gets the allocated slot that this slot refers to. + * + * @return This slot's allocated slot. + */ + public AllocatedSlot getAllocatedSlot() { + return allocatedSlot; + } + + /** * Returns the ID of the job this allocated slot belongs to. * * @return the ID of the job this allocated slot belongs to */ public JobID getJobID() { - return this.jobID; + return allocatedSlot.getJobID(); } /** @@ -126,7 +165,7 @@ public abstract class Slot { * @return The ID of the TaskManager that offers this slot */ public ResourceID getTaskManagerID() { - return taskManagerLocation.getResourceID(); + return allocatedSlot.getTaskManagerLocation().getResourceID(); } /** @@ -135,7 +174,7 @@ public abstract class Slot { * @return The location info of the TaskManager that offers this slot */ public TaskManagerLocation getTaskManagerLocation() { - return taskManagerLocation; + return allocatedSlot.getTaskManagerLocation(); } /** @@ -146,7 +185,7 @@ public abstract class Slot { * @return The actor gateway that can be used to send messages to the TaskManager. */ public ActorGateway getTaskManagerActorGateway() { - return taskManagerActorGateway; + return allocatedSlot.getTaskManagerActorGateway(); } /** @@ -303,7 +342,7 @@ public abstract class Slot { @Override public String toString() { - return hierarchy() + " - " + taskManagerLocation + " - " + getStateName(status); + return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status); } protected String hierarchy() { http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java new file mode 100644 index 0000000..355524c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager. + * It represents a slice of allocated resources from the TaskManager. + * + * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The + * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the + * JobManager and notify the JobManager. + * + * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6), + * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the + * JobManager. All slots had a default unknown resource profile. + */ +public class AllocatedSlot { + + /** The ID under which the slot is allocated. Uniquely identifies the slot. */ + private final AllocationID slotAllocationId; + + /** The ID of the job this slot is allocated for */ + private final JobID jobID; + + /** The location information of the TaskManager to which this slot belongs */ + private final TaskManagerLocation taskManagerLocation; + + /** The resource profile of the slot provides */ + private final ResourceProfile resourceProfile; + + /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */ + private final ActorGateway taskManagerActorGateway; + + /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */ + private final int slotNumber; + + // ------------------------------------------------------------------------ + + public AllocatedSlot( + AllocationID slotAllocationId, + JobID jobID, + TaskManagerLocation location, + int slotNumber, + ResourceProfile resourceProfile, + ActorGateway actorGateway) + { + this.slotAllocationId = checkNotNull(slotAllocationId); + this.jobID = checkNotNull(jobID); + this.taskManagerLocation = checkNotNull(location); + this.slotNumber = slotNumber; + this.resourceProfile = checkNotNull(resourceProfile); + this.taskManagerActorGateway = checkNotNull(actorGateway); + } + + public AllocatedSlot(AllocatedSlot other) { + this.slotAllocationId = other.slotAllocationId; + this.jobID = other.jobID; + this.taskManagerLocation = other.taskManagerLocation; + this.slotNumber = other.slotNumber; + this.resourceProfile = other.resourceProfile; + this.taskManagerActorGateway = other.taskManagerActorGateway; + } + + // ------------------------------------------------------------------------ + + /** + * Gets the ID under which the slot is allocated, which uniquely identifies the slot. + * + * @return The ID under which the slot is allocated + */ + public AllocationID getSlotAllocationId() { + return slotAllocationId; + } + + /** + * Returns the ID of the job this allocated slot belongs to. + * + * @return the ID of the job this allocated slot belongs to + */ + public JobID getJobID() { + return jobID; + } + + /** + * Gets the number of the slot. + * + * @return The number of the slot on the TaskManager. + */ + public int getSlotNumber() { + return slotNumber; + } + + /** + * Gets the resource profile of the slot. + * + * @return The resource profile of the slot. + */ + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + /** + * Gets the location info of the TaskManager that offers this slot. + * + * @return The location info of the TaskManager that offers this slot + */ + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + /** + * Gets the actor gateway that can be used to send messages to the TaskManager. + * <p> + * This method should be removed once the new interface-based RPC abstraction is in place + * + * @return The actor gateway that can be used to send messages to the TaskManager. + */ + public ActorGateway getTaskManagerActorGateway() { + return taskManagerActorGateway; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber; + } +}