[FLINK-5810] [flip-6] Make slot registration static

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16a5a29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16a5a29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16a5a29

Branch: refs/heads/table-retraction
Commit: d16a5a297008fc9665889dd56e24ff81472db2c6
Parents: 759f46e
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Apr 27 15:53:36 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 15:28:26 2017 +0200

----------------------------------------------------------------------
 .../slotmanager/SlotManager.java                | 76 +++++++++++---------
 .../slotmanager/TaskManagerRegistration.java    | 22 +++---
 .../flink/runtime/taskexecutor/SlotReport.java  |  4 --
 .../slotmanager/SlotManagerTest.java            | 30 ++++----
 4 files changed, 70 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 31edbf3..f09b73a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -250,12 +250,33 @@ public class SlotManager implements AutoCloseable {
                checkInit();
 
                // we identify task managers by their instance id
-               if 
(!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) 
{
-                       TaskManagerRegistration taskManagerRegistration = new 
TaskManagerRegistration(taskExecutorConnection);
+               if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
+                       
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
+               } else {
+                       // first register the TaskManager
+                       ArrayList<SlotID> reportedSlots = new ArrayList<>();
+
+                       for (SlotStatus slotStatus : initialSlotReport) {
+                               reportedSlots.add(slotStatus.getSlotID());
+                       }
+
+                       TaskManagerRegistration taskManagerRegistration = new 
TaskManagerRegistration(taskExecutorConnection, reportedSlots);
                        
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), 
taskManagerRegistration);
+
+                       // next register the new slots
+                       for (SlotStatus slotStatus : initialSlotReport) {
+                               registerSlot(
+                                       slotStatus.getSlotID(),
+                                       slotStatus.getAllocationID(),
+                                       slotStatus.getResourceProfile(),
+                                       taskExecutorConnection);
+                       }
+
+                       if (!anySlotUsed(taskManagerRegistration.getSlots())) {
+                               
registerTaskManagerTimeout(taskManagerRegistration);
+                       }
                }
 
-               reportSlotStatus(taskExecutorConnection.getInstanceID(), 
initialSlotReport);
        }
 
        /**
@@ -296,31 +317,22 @@ public class SlotManager implements AutoCloseable {
                TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
 
                if (null != taskManagerRegistration) {
-                       ArrayList<SlotID> slotsToRemove = new 
ArrayList<>(taskManagerRegistration.getSlots());
                        boolean idle = true;
 
                        for (SlotStatus slotStatus : slotReport) {
-                               if 
(slotsToRemove.remove(slotStatus.getSlotID())) {
-                                       // slot which was already registered
-                                       updateSlot(slotStatus.getSlotID(), 
slotStatus.getAllocationID());
+
+                               // We assume that the slots of a TaskManager 
don't change over its lifetime and they are registered
+                               // once when the TaskManager is registered
+                               if 
(taskManagerRegistration.containsSlot(slotStatus.getSlotID()) && 
updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID())) {
+                                       TaskManagerSlot slot = 
slots.get(slotStatus.getSlotID());
+                                       idle &= slot.isFree();
                                } else {
-                                       // new slot
-                                       registerSlot(
-                                               taskManagerRegistration,
-                                               slotStatus.getSlotID(),
-                                               slotStatus.getAllocationID(),
-                                               slotStatus.getResourceProfile(),
-                                               
taskManagerRegistration.getTaskManagerConnection());
+                                       // sanity check to guarantee that slots 
of a TaskManager don't change
+                                       throw new 
IllegalStateException("Reported a slot status for slot " +  
slotStatus.getSlotID() +
+                                               " which has not been 
registered.");
                                }
-
-                               TaskManagerSlot slot = 
slots.get(slotStatus.getSlotID());
-
-                               idle &= slot.isFree();
                        }
 
-                       // remove the slots for which we haven't received a 
slot status message
-                       removeSlots(slotsToRemove);
-
                        if (idle) {
                                // no slot of this task manager is being used 
--> register timer to free this resource
                                
registerTaskManagerTimeout(taskManagerRegistration);
@@ -439,20 +451,21 @@ public class SlotManager implements AutoCloseable {
         * the given slot id. The given resource profile defines the available 
resources for the slot.
         * The task manager connection can be used to communicate with the task 
manager.
         *
-        * @param taskManagerRegistration Task manager for which to register 
the given slot
         * @param slotId identifying the slot on the task manager
         * @param allocationId which is currently deployed in the slot
         * @param resourceProfile of the slot
         * @param taskManagerConnection to communicate with the remote task 
manager
         */
        private void registerSlot(
-                       TaskManagerRegistration taskManagerRegistration,
                        SlotID slotId,
                        AllocationID allocationId,
                        ResourceProfile resourceProfile,
                        TaskExecutorConnection taskManagerConnection) {
 
-               Preconditions.checkNotNull(taskManagerRegistration);
+               if (slots.containsKey(slotId)) {
+                       // remove the old slot first
+                       removeSlot(slotId);
+               }
 
                TaskManagerSlot slot = new TaskManagerSlot(
                        slotId,
@@ -462,8 +475,6 @@ public class SlotManager implements AutoCloseable {
 
                slots.put(slotId, slot);
 
-               taskManagerRegistration.addSlot(slotId);
-
                if (slot.isFree()) {
                        handleFreeSlot(slot);
                }
@@ -478,8 +489,9 @@ public class SlotManager implements AutoCloseable {
         *
         * @param slotId to update
         * @param allocationId specifying the current allocation of the slot
+        * @return True if the slot could be updated; otherwise false
         */
-       private void updateSlot(SlotID slotId, AllocationID allocationId) {
+       private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
                TaskManagerSlot slot = slots.get(slotId);
 
                if (null != slot) {
@@ -516,8 +528,12 @@ public class SlotManager implements AutoCloseable {
                                        taskManagerRegistration.cancelTimeout();
                                }
                        }
+
+                       return true;
                } else {
                        LOG.debug("Trying to update unknown slot with slot id 
{}.", slotId);
+
+                       return false;
                }
        }
 
@@ -670,12 +686,6 @@ public class SlotManager implements AutoCloseable {
                        AllocationID oldAllocationId = slot.getAllocationId();
 
                        fulfilledSlotRequests.remove(oldAllocationId);
-
-                       TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(slot.getInstanceId());
-
-                       if (null != taskManagerRegistration) {
-                               taskManagerRegistration.removeSlot(slotId);
-                       }
                } else {
                        LOG.debug("There was no slot registered with slot id 
{}.", slotId);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 8e23dbb..3a15cb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.instance.InstanceID;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
@@ -38,10 +38,14 @@ public class TaskManagerRegistration {
 
        private ScheduledFuture<?> timeoutFuture;
 
-       public TaskManagerRegistration(TaskExecutorConnection 
taskManagerConnection) {
-               this.taskManagerConnection = 
Preconditions.checkNotNull(taskManagerConnection);
+       public TaskManagerRegistration(
+               TaskExecutorConnection taskManagerConnection,
+               Collection<SlotID> slots) {
 
-               slots = new HashSet<>(4);
+               this.taskManagerConnection = 
Preconditions.checkNotNull(taskManagerConnection, "taskManagerConnection");
+               Preconditions.checkNotNull(slots, "slots");
+
+               this.slots = new HashSet<>(slots);
 
                timeoutIdentifier = null;
                timeoutFuture = null;
@@ -59,16 +63,12 @@ public class TaskManagerRegistration {
                return timeoutIdentifier;
        }
 
-       public Set<SlotID> getSlots() {
+       public Iterable<SlotID> getSlots() {
                return slots;
        }
 
-       public boolean removeSlot(SlotID slotId) {
-               return slots.remove(slotId);
-       }
-
-       public void addSlot(SlotID slotId) {
-               slots.add(slotId);
+       public boolean containsSlot(SlotID slotId) {
+               return slots.contains(slotId);
        }
 
        public void cancelTimeout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index 7a9da28..94ecaff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -49,10 +49,6 @@ public class SlotReport implements Serializable, 
Iterable<SlotStatus> {
                this.slotsStatus = checkNotNull(slotsStatus);
        }
 
-       public Collection<SlotStatus> getSlotsStatus() {
-               return slotsStatus;
-       }
-
        @Override
        public Iterator<SlotStatus> iterator() {
                return slotsStatus.iterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index b0b5d32..fff2829 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -587,8 +587,7 @@ public class SlotManagerTest extends TestLogger {
 
        /**
         * Tests that slots are updated with respect to the latest incoming 
slot report. This means that
-        * slot for which not report has been received will be removed and 
those for which a report was
-        * received are updated accordingly.
+        * slots for which a report was received are updated accordingly.
         */
        @Test
        public void testUpdateSlotReport() throws Exception {
@@ -601,7 +600,6 @@ public class SlotManagerTest extends TestLogger {
                final ResourceID resourceId = ResourceID.generate();
                final SlotID slotId1 = new SlotID(resourceId, 0);
                final SlotID slotId2 = new SlotID(resourceId, 1);
-               final SlotID slotId3 = new SlotID(resourceId, 2);
 
 
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
@@ -609,10 +607,9 @@ public class SlotManagerTest extends TestLogger {
                final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
 
                final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, 
resourceProfile, jobId, allocationId);
-               final SlotStatus slotStatus3 = new SlotStatus(slotId3, 
resourceProfile);
 
                final SlotReport slotReport1 = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
-               final SlotReport slotReport2 = new 
SlotReport(Arrays.asList(newSlotStatus2, slotStatus3));
+               final SlotReport slotReport2 = new 
SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
 
                final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
@@ -623,20 +620,21 @@ public class SlotManagerTest extends TestLogger {
 
                        slotManager.registerTaskManager(taskManagerConnection, 
slotReport1);
 
-                       TaskManagerSlot slot = slotManager.getSlot(slotId2);
+                       TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
+                       TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
 
                        assertTrue(2 == slotManager.getNumberRegisteredSlots());
 
-                       assertTrue(slot.isFree());
+                       assertTrue(slot1.isFree());
+                       assertTrue(slot2.isFree());
 
                        
assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
slotReport2));
 
                        assertTrue(2 == slotManager.getNumberRegisteredSlots());
 
                        // the slot manager should have removed slotId1
-                       assertNull(slotManager.getSlot(slotId1));
-
-                       assertNotNull(slotManager.getSlot(slotId3));
+                       assertNotNull(slotManager.getSlot(slotId1));
+                       assertNotNull(slotManager.getSlot(slotId2));
 
                        // slotId2 should have been allocated for allocationId
                        assertEquals(allocationId, 
slotManager.getSlot(slotId2).getAllocationId());
@@ -840,7 +838,6 @@ public class SlotManagerTest extends TestLogger {
                final ResourceID resourceId = ResourceID.generate();
                final SlotID slotId1 = new SlotID(resourceId, 0);
                final SlotID slotId2 = new SlotID(resourceId, 1);
-               final SlotID slotId3 = new SlotID(resourceId, 2);
                final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
                final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
                final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
@@ -870,11 +867,16 @@ public class SlotManagerTest extends TestLogger {
                                eq(leaderId),
                                any(Time.class));
 
+                       final SlotID requestedSlotdId = slotIdCaptor.getValue();
+                       final SlotID freeSlotId = 
requestedSlotdId.equals(slotId1) ? slotId2 : slotId1;
+
+                       assertTrue(slotManager.getSlot(freeSlotId).isFree());
+
                        final SlotStatus newSlotStatus1 = new 
SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new 
AllocationID());
-                       final SlotStatus newSlotStatus2 = new 
SlotStatus(slotId3, resourceProfile);
+                       final SlotStatus newSlotStatus2 = new 
SlotStatus(freeSlotId, resourceProfile);
                        final SlotReport newSlotReport = new 
SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-                       // this should remove the unused slot, replacing it 
with slotId3 and retry the pending slot request
+                       // this should update the slot with the pending slot 
request triggering the reassignment of it
                        
slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
newSlotReport);
 
                        ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
@@ -899,7 +901,7 @@ public class SlotManagerTest extends TestLogger {
 
                        final SlotID requestedSlotId = slotIdCaptor.getValue();
 
-                       assertEquals(slotId3, requestedSlotId);
+                       assertEquals(slotId2, requestedSlotId);
 
                        TaskManagerSlot slot = 
slotManager.getSlot(requestedSlotId);
 

Reply via email to