Repository: flink
Updated Branches:
  refs/heads/master 5783671c2 -> f6d866817


[FLINK-4373] [cluster management] Introduce AllocationID, ResourceProfile, and 
AllocatedSlot

These classes are introduced as part of the cluster management rework.

This closes #2630


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6d86681
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6d86681
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6d86681

Branch: refs/heads/master
Commit: f6d8668175fb94f338037fd1ab40d2a2e344d097
Parents: 5783671
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 13 18:50:18 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 11:33:35 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/AllocationID.java    |  50 ++++++
 .../clusterframework/types/ResourceProfile.java | 122 +++++++++++++++
 .../flink/runtime/instance/SharedSlot.java      |  53 ++++++-
 .../flink/runtime/instance/SimpleSlot.java      |  57 ++++++-
 .../org/apache/flink/runtime/instance/Slot.java |  77 +++++++---
 .../runtime/jobmanager/slots/AllocatedSlot.java | 152 +++++++++++++++++++
 6 files changed, 488 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
new file mode 100644
index 0000000..59d8f9d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Unique identifier for a slot allocated by a JobManager from a TaskManager.
+ * Also identifies a pending allocation request, and is constant across 
retries.
+ * 
+ * <p>This ID is used for all synchronization of the status of Slots from 
TaskManagers
+ * that are not free (i.e., have been allocated by a job).
+ */
+public class AllocationID extends AbstractID {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Constructs a new random AllocationID.
+        */
+       public AllocationID() {
+               super();
+       }
+
+       /**
+        * Constructs a new AllocationID with the given parts.
+        *
+        * @param lowerPart the lower bytes of the ID
+        * @param upperPart the higher bytes of the ID
+        */
+       public AllocationID(long lowerPart, long upperPart) {
+               super(lowerPart, upperPart);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
new file mode 100644
index 0000000..7a25de1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+/**
+ * Describe the resource profile of the slot, either when requiring or 
offering it. The profile can be
+ * checked whether it can match another profile's requirement, and furthermore 
we may calculate a matching
+ * score to decide which profile we should choose when we have lots of 
candidate slots.
+ */
+public class ResourceProfile implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, 
-1L);
+
+       // 
------------------------------------------------------------------------
+
+       /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+       private final double cpuCores;
+
+       /** How many memory in mb are needed */
+       private final long memoryInMB;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new ResourceProfile.
+        * 
+        * @param cpuCores   The number of CPU cores (possibly fractional, 
i.e., 0.2 cores)
+        * @param memoryInMB The size of the memory, in megabytes.
+        */
+       public ResourceProfile(double cpuCores, long memoryInMB) {
+               this.cpuCores = cpuCores;
+               this.memoryInMB = memoryInMB;
+       }
+
+       /**
+        * Creates a copy of the given ResourceProfile.
+        * 
+        * @param other The ResourceProfile to copy. 
+        */
+       public ResourceProfile(ResourceProfile other) {
+               this.cpuCores = other.cpuCores;
+               this.memoryInMB = other.memoryInMB;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Get the cpu cores needed
+        * @return The cpu cores, 1.0 means a full cpu thread
+        */
+       public double getCpuCores() {
+               return cpuCores;
+       }
+
+       /**
+        * Get the memory needed in MB
+        * @return The memory in MB
+        */
+       public long getMemoryInMB() {
+               return memoryInMB;
+       }
+
+       /**
+        * Check whether required resource profile can be matched
+        *
+        * @param required the required resource profile
+        * @return true if the requirement is matched, otherwise false
+        */
+       public boolean isMatching(ResourceProfile required) {
+               return cpuCores >= required.getCpuCores() && memoryInMB >= 
required.getMemoryInMB();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               long cpuBits = Double.doubleToLongBits(cpuCores);
+               return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ 
(memoryInMB >> 32));
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+               else if (obj != null && obj.getClass() == 
ResourceProfile.class) {
+                       ResourceProfile that = (ResourceProfile) obj;
+                       return this.cpuCores == that.cpuCores && 
this.memoryInMB == that.memoryInMB; 
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "ResourceProfile{" +
+                       "cpuCores=" + cpuCores +
+                       ", memoryInMB=" + memoryInMB +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 7f05604..97385b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
@@ -50,11 +51,14 @@ public class SharedSlot extends Slot {
        /** The set os sub-slots allocated from this shared slot */
        private final Set<Slot> subSlots;
 
+       // 
------------------------------------------------------------------------
+       //  Old Constructors (prior FLIP-6)
+       // 
------------------------------------------------------------------------
 
        /**
         * Creates a new shared slot that has no parent (is a root slot) and 
does not belong to any task group.
         * This constructor is used to create a slot directly from an instance. 
-        * 
+        *
         * @param jobID The ID of the job that the slot is created for.
         * @param owner The component from which this slot is allocated.
         * @param location The location info of the TaskManager where the slot 
was allocated from
@@ -73,7 +77,7 @@ public class SharedSlot extends Slot {
        /**
         * Creates a new shared slot that has is a sub-slot of the given parent 
shared slot, and that belongs
         * to the given task group.
-        * 
+        *
         * @param jobID The ID of the job that the slot is created for.
         * @param owner The component from which this slot is allocated.
         * @param location The location info of the TaskManager where the slot 
was allocated from
@@ -96,6 +100,51 @@ public class SharedSlot extends Slot {
        }
 
        // 
------------------------------------------------------------------------
+       //  Constructors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new shared slot that has no parent (is a root slot) and 
does not belong to any task group.
+        * This constructor is used to create a slot directly from an instance.
+        * 
+        * @param allocatedSlot The allocated slot that this slot represents.
+        * @param owner The component from which this slot is allocated.
+        * @param assignmentGroup The assignment group that this shared slot 
belongs to.
+        */
+       public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, 
SlotSharingGroupAssignment assignmentGroup) {
+               this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), 
assignmentGroup, null, null);
+       }
+
+       /**
+        * Creates a new shared slot that is a sub-slot of the given parent 
shared slot, and that belongs
+        * to the given task group.
+        * 
+        * @param parent The parent slot of this slot.
+        * @param owner The component from which this slot is allocated.
+        * @param slotNumber The number of the slot.
+        * @param assignmentGroup The assignment group that this shared slot 
belongs to.
+        * @param groupId The assignment group of this slot.
+        */
+       public SharedSlot(
+                       SharedSlot parent, SlotOwner owner, int slotNumber,
+                       SlotSharingGroupAssignment assignmentGroup,
+                       AbstractID groupId) {
+
+               this(parent.getAllocatedSlot(), owner, slotNumber, 
assignmentGroup, parent, groupId);
+       }
+
+       private SharedSlot(
+                       AllocatedSlot allocatedSlot, SlotOwner owner, int 
slotNumber,
+                       SlotSharingGroupAssignment assignmentGroup,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupId) {
+
+               super(allocatedSlot, owner, slotNumber, parent, groupId);
+
+               this.assignmentGroup = checkNotNull(assignmentGroup);
+               this.subSlots = new HashSet<Slot>();
+       }
+
+       // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 615138f..479fa29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
@@ -48,6 +50,9 @@ public class SimpleSlot extends Slot {
        /** The locality attached to the slot, defining whether the slot was 
allocated at the desired location. */
        private volatile Locality locality = Locality.UNCONSTRAINED;
 
+       // 
------------------------------------------------------------------------
+       //  Old Constructors (prior FLIP-6)
+       // 
------------------------------------------------------------------------
 
        /**
         * Creates a new simple slot that stands alone and does not belong to 
shared slot.
@@ -66,7 +71,7 @@ public class SimpleSlot extends Slot {
 
        /**
         * Creates a new simple slot that belongs to the given shared slot and
-        * is identified by the given ID..
+        * is identified by the given ID.
         *
         * @param jobID The ID of the job that the slot is allocated for.
         * @param owner The component from which this slot is allocated.
@@ -80,7 +85,55 @@ public class SimpleSlot extends Slot {
                        ActorGateway taskManagerActorGateway,
                        @Nullable SharedSlot parent, @Nullable AbstractID 
groupID) {
 
-               super(jobID, owner, location, slotNumber, 
taskManagerActorGateway, parent, groupID);
+               super(parent != null ?
+                               parent.getAllocatedSlot() :
+                               new AllocatedSlot(NO_ALLOCATION_ID, jobID, 
location, slotNumber,
+                                               ResourceProfile.UNKNOWN, 
taskManagerActorGateway),
+                               owner, slotNumber, parent, groupID);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Constructors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new simple slot that stands alone and does not belong to 
shared slot.
+        *
+        * @param allocatedSlot The allocated slot that this slot represents.
+        * @param owner The component from which this slot is allocated.
+        * @param slotNumber The number of the task slot on the instance.
+        */
+       public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int 
slotNumber) {
+               this(allocatedSlot, owner, slotNumber, null, null);
+       }
+
+       /**
+        * Creates a new simple slot that belongs to the given shared slot and
+        * is identified by the given ID..
+        *
+        * @param parent The parent shared slot.
+        * @param owner The component from which this slot is allocated.
+        * @param slotNumber The number of the simple slot in its parent shared 
slot.
+        * @param groupID The ID that identifies the group that the slot 
belongs to.
+        */
+       public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, 
AbstractID groupID) {
+               this(parent.getAllocatedSlot(), owner, slotNumber, parent, 
groupID);
+       }
+       
+       /**
+        * Creates a new simple slot that belongs to the given shared slot and
+        * is identified by the given ID..
+        *
+        * @param allocatedSlot The allocated slot that this slot represents.
+        * @param owner The component from which this slot is allocated.
+        * @param slotNumber The number of the simple slot in its parent shared 
slot.
+        * @param parent The parent shared slot.
+        * @param groupID The ID that identifies the group that the slot 
belongs to.
+        */
+       private SimpleSlot(
+                       AllocatedSlot allocatedSlot, SlotOwner owner, int 
slotNumber,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupID) {
+               super(allocatedSlot, owner, slotNumber, parent, groupID);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 451a9ec..b840b0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
@@ -31,8 +34,10 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Base class for task slots. TaskManagers offer one or more task slots, which 
define a slice of 
- * their resources.
+ * Base class for slots that the Scheduler / ExecutionGraph take from the 
SlotPool and use to place
+ * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a 
TaskManager's resources),
+ * plus additional fields to track what is currently executed in that slot, or 
if the slot is still
+ * used or disposed (ExecutionGraph gave it back to the pool).
  *
  * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). 
In the more complex
  * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. 
Shared slots may contain
@@ -54,16 +59,13 @@ public abstract class Slot {
        /** State where all tasks in this slot have been canceled and the slot 
been given back to the instance */
        private static final int RELEASED = 2;
 
-       // 
------------------------------------------------------------------------
-
-       /** The ID of the job this slice belongs to. */
-       private final JobID jobID;
+       // temporary placeholder for Slots that are not constructed from an 
AllocatedSlot (prior to FLIP-6)
+       protected static final AllocationID NO_ALLOCATION_ID = new 
AllocationID(0, 0);
 
-       /** The location information of the TaskManager to which this slot 
belongs */
-       private final TaskManagerLocation taskManagerLocation;
+       // 
------------------------------------------------------------------------
 
-       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
-       private final ActorGateway taskManagerActorGateway;
+       /** The allocated slot that this slot represents. */
+       private final AllocatedSlot allocatedSlot;
 
        /** The owner of this slot - the slot was taken from that owner and 
must be disposed to it */
        private final SlotOwner owner;
@@ -81,10 +83,14 @@ public abstract class Slot {
 
        /** The state of the vertex, only atomically updated */
        private volatile int status = ALLOCATED_AND_ALIVE;
-       
+
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Base constructor for slots.
         * 
+        * <p>This is the old way of constructing slots, prior to the FLIP-6 
resource management refactoring.
+        * 
         * @param jobID The ID of the job that this slot is allocated for.
         * @param owner The component from which this slot is allocated.
         * @param location The location info of the TaskManager where the slot 
was allocated from
@@ -101,23 +107,56 @@ public abstract class Slot {
 
                checkArgument(slotNumber >= 0);
 
-               this.jobID = checkNotNull(jobID);
-               this.taskManagerLocation = checkNotNull(location);
+               this.allocatedSlot = new AllocatedSlot(
+                               NO_ALLOCATION_ID, jobID, location, slotNumber, 
ResourceProfile.UNKNOWN, taskManagerActorGateway);
+
+               this.owner = checkNotNull(owner);
+               this.parent = parent; // may be null
+               this.groupID = groupID; // may be null
+               this.slotNumber = slotNumber;
+       }
+
+       /**
+        * Base constructor for slots.
+        *
+        * @param allocatedSlot The allocated slot that this slot represents.
+        * @param owner The component from which this slot is allocated.
+        * @param slotNumber The number of this slot.
+        * @param parent The parent slot that contains this slot. May be null, 
if this slot is the root.
+        * @param groupID The ID that identifies the task group for which this 
slot is allocated. May be null
+        *                if the slot does not belong to any task group.   
+        */
+       protected Slot(
+                       AllocatedSlot allocatedSlot, SlotOwner owner, int 
slotNumber,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupID) {
+
+               checkArgument(slotNumber >= 0);
+
+               this.allocatedSlot = checkNotNull(allocatedSlot);
                this.owner = checkNotNull(owner);
-               this.taskManagerActorGateway = 
checkNotNull(taskManagerActorGateway);
                this.parent = parent; // may be null
                this.groupID = groupID; // may be null
                this.slotNumber = slotNumber;
        }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
+        * Gets the allocated slot that this slot refers to.
+        * 
+        * @return This slot's allocated slot.
+        */
+       public AllocatedSlot getAllocatedSlot() {
+               return allocatedSlot;
+       }
+
+       /**
         * Returns the ID of the job this allocated slot belongs to.
         *
         * @return the ID of the job this allocated slot belongs to
         */
        public JobID getJobID() {
-               return this.jobID;
+               return allocatedSlot.getJobID();
        }
 
        /**
@@ -126,7 +165,7 @@ public abstract class Slot {
         * @return The ID of the TaskManager that offers this slot
         */
        public ResourceID getTaskManagerID() {
-               return taskManagerLocation.getResourceID();
+               return allocatedSlot.getTaskManagerLocation().getResourceID();
        }
 
        /**
@@ -135,7 +174,7 @@ public abstract class Slot {
         * @return The location info of the TaskManager that offers this slot
         */
        public TaskManagerLocation getTaskManagerLocation() {
-               return taskManagerLocation;
+               return allocatedSlot.getTaskManagerLocation();
        }
 
        /**
@@ -146,7 +185,7 @@ public abstract class Slot {
         * @return The actor gateway that can be used to send messages to the 
TaskManager.
         */
        public ActorGateway getTaskManagerActorGateway() {
-               return taskManagerActorGateway;
+               return allocatedSlot.getTaskManagerActorGateway();
        }
 
        /**
@@ -303,7 +342,7 @@ public abstract class Slot {
 
        @Override
        public String toString() {
-               return hierarchy() + " - " + taskManagerLocation + " - " + 
getStateName(status);
+               return hierarchy() + " - " + getTaskManagerLocation() + " - " + 
getStateName(status);
        }
 
        protected String hierarchy() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f6d86681/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
new file mode 100644
index 0000000..355524c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code AllocatedSlot} represents a slot that the JobManager allocated 
from a TaskManager.
+ * It represents a slice of allocated resources from the TaskManager.
+ * 
+ * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the 
ResourceManager. The
+ * ResourceManager picks (or starts) a TaskManager that will then allocate the 
slot to the
+ * JobManager and notify the JobManager.
+ * 
+ * <p>Note: Prior to the resource management changes introduced in (Flink 
Improvement Proposal 6),
+ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager 
registered at the
+ * JobManager. All slots had a default unknown resource profile. 
+ */
+public class AllocatedSlot {
+
+       /** The ID under which the slot is allocated. Uniquely identifies the 
slot. */
+       private final AllocationID slotAllocationId;
+
+       /** The ID of the job this slot is allocated for */
+       private final JobID jobID;
+
+       /** The location information of the TaskManager to which this slot 
belongs */
+       private final TaskManagerLocation taskManagerLocation;
+
+       /** The resource profile of the slot provides */
+       private final ResourceProfile resourceProfile;
+
+       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
+       private final ActorGateway taskManagerActorGateway;
+
+       /** The number of the slot on the TaskManager to which slot belongs. 
Purely informational. */
+       private final int slotNumber;
+
+       // 
------------------------------------------------------------------------
+
+       public AllocatedSlot(
+                       AllocationID slotAllocationId,
+                       JobID jobID,
+                       TaskManagerLocation location,
+                       int slotNumber,
+                       ResourceProfile resourceProfile,
+                       ActorGateway actorGateway)
+       {
+               this.slotAllocationId = checkNotNull(slotAllocationId);
+               this.jobID = checkNotNull(jobID);
+               this.taskManagerLocation = checkNotNull(location);
+               this.slotNumber = slotNumber;
+               this.resourceProfile = checkNotNull(resourceProfile);
+               this.taskManagerActorGateway = checkNotNull(actorGateway);
+       }
+
+       public AllocatedSlot(AllocatedSlot other) {
+               this.slotAllocationId = other.slotAllocationId;
+               this.jobID = other.jobID;
+               this.taskManagerLocation = other.taskManagerLocation;
+               this.slotNumber = other.slotNumber;
+               this.resourceProfile = other.resourceProfile;
+               this.taskManagerActorGateway = other.taskManagerActorGateway;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the ID under which the slot is allocated, which uniquely 
identifies the slot.
+        * 
+        * @return The ID under which the slot is allocated
+        */
+       public AllocationID getSlotAllocationId() {
+               return slotAllocationId;
+       }
+
+       /**
+        * Returns the ID of the job this allocated slot belongs to.
+        *
+        * @return the ID of the job this allocated slot belongs to
+        */
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       /**
+        * Gets the number of the slot.
+        *
+        * @return The number of the slot on the TaskManager.
+        */
+       public int getSlotNumber() {
+               return slotNumber;
+       }
+
+       /**
+        * Gets the resource profile of the slot.
+        *
+        * @return The resource profile of the slot.
+        */
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       /**
+        * Gets the location info of the TaskManager that offers this slot.
+        *
+        * @return The location info of the TaskManager that offers this slot
+        */
+       public TaskManagerLocation getTaskManagerLocation() {
+               return taskManagerLocation;
+       }
+
+       /**
+        * Gets the actor gateway that can be used to send messages to the 
TaskManager.
+        * <p>
+        * This method should be removed once the new interface-based RPC 
abstraction is in place
+        *
+        * @return The actor gateway that can be used to send messages to the 
TaskManager.
+        */
+       public ActorGateway getTaskManagerActorGateway() {
+               return taskManagerActorGateway;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "AllocatedSlot " + slotAllocationId + " @ " + 
taskManagerLocation + " - " + slotNumber;
+       }
+}

Reply via email to