[FLINK-5810] [flip-6] Multiple small cleanups across Resource Manager related 
code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/759f46ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/759f46ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/759f46ea

Branch: refs/heads/table-retraction
Commit: 759f46ea6716e2f9da98002ed881be2fe6d7ab39
Parents: 59aefb5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 18:53:21 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 15:28:25 2017 +0200

----------------------------------------------------------------------
 .../types/ResourceIDRetrievable.java            |  4 +++-
 .../runtime/clusterframework/types/SlotID.java  | 20 ++++++--------------
 .../runtime/concurrent/ScheduledExecutor.java   |  2 +-
 .../flink/runtime/messages/Acknowledge.java     |  3 ++-
 .../registration/TaskExecutorConnection.java    |  7 ++++---
 .../slotmanager/PendingSlotRequest.java         | 10 +++++++++-
 .../taskexecutor/TaskExecutorGateway.java       |  2 --
 .../slotmanager/SlotProtocolTest.java           | 16 ++++++++--------
 8 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
index b45d53c..e65840d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.clusterframework.types;
  */
 public interface ResourceIDRetrievable {
 
+       /**
+        * Gets the ResourceID of the object.
+        */
        ResourceID getResourceID();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d6409b6..83c28b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -24,7 +24,11 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Unique identifier for a slot which located in TaskManager.
+ * Unique identifier for a slot on a TaskManager. This ID is constant across 
the
+ * life time of the TaskManager.
+ * 
+ * <p>In contrast, the {@link AllocationID} represents the a slot allocation 
and changes
+ * every time the slot is allocated by a JobManager.
  */
 public class SlotID implements ResourceIDRetrievable, Serializable {
 
@@ -66,10 +70,7 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
 
                SlotID slotID = (SlotID) o;
 
-               if (slotNumber != slotID.slotNumber) {
-                       return false;
-               }
-               return resourceId.equals(slotID.resourceId);
+               return slotNumber == slotID.slotNumber && 
resourceId.equals(slotID.resourceId);
        }
 
        @Override
@@ -83,13 +84,4 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
        public String toString() {
                return resourceId + "_" + slotNumber;
        }
-
-       /**
-        * Generate a random slot id.
-        *
-        * @return A random slot id.
-        */
-       public static SlotID generate() {
-               return new SlotID(ResourceID.generate(), 0);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
index c1b47e2..d09cfc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -82,7 +82,7 @@ public interface ScheduledExecutor extends Executor {
         * @param delay the time between the end of the current and the start 
of the next execution
         * @param unit the time unit of the initial delay and the delay 
parameter
         * @return a ScheduledFuture representing the repeatedly executed task. 
This future never
-        * completes unless th exectuion of the given task fails or if the 
future is cancelled
+        * completes unless the execution of the given task fails or if the 
future is cancelled
         */
        ScheduledFuture<?> scheduleWithFixedDelay(
                Runnable command,

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
index 4bbc50a..2c08ec9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
@@ -62,8 +62,9 @@ public class Acknowledge implements Serializable {
 
        /**
         * Read resolve to preserve the singleton object property.
+        * (per best practices, this should have visibility 'protected')
         */
-       private Object readResolve() throws java.io.ObjectStreamException {
+       protected Object readResolve() throws java.io.ObjectStreamException {
                return INSTANCE;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index e4522f2..babe5b9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager.registration;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * This class is responsible for grouping the TaskExecutorGateway and the 
InstanceID
  * of a registered task executor.
@@ -29,11 +31,11 @@ public class TaskExecutorConnection {
 
        private final InstanceID instanceID;
 
-       private TaskExecutorGateway taskExecutorGateway;
+       private final TaskExecutorGateway taskExecutorGateway;
 
        public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
                this.instanceID = new InstanceID();
-               this.taskExecutorGateway = taskExecutorGateway;
+               this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
        }
 
        public InstanceID getInstanceID() {
@@ -43,5 +45,4 @@ public class TaskExecutorConnection {
        public TaskExecutorGateway getTaskExecutorGateway() {
                return taskExecutorGateway;
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 894f146..1195791 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
@@ -33,16 +34,21 @@ public class PendingSlotRequest {
 
        private final SlotRequest slotRequest;
 
+       @Nullable
        private CompletableFuture<Acknowledge> requestFuture;
 
+       @Nullable
        private UUID timeoutIdentifier;
 
+       @Nullable
        private ScheduledFuture<?> timeoutFuture;
 
        public PendingSlotRequest(SlotRequest slotRequest) {
                this.slotRequest = Preconditions.checkNotNull(slotRequest);
        }
 
+       // 
------------------------------------------------------------------------
+
        public AllocationID getAllocationId() {
                return slotRequest.getAllocationId();
        }
@@ -51,6 +57,7 @@ public class PendingSlotRequest {
                return slotRequest.getResourceProfile();
        }
 
+       @Nullable
        public UUID getTimeoutIdentifier() {
                return timeoutIdentifier;
        }
@@ -67,10 +74,11 @@ public class PendingSlotRequest {
                return null != requestFuture;
        }
 
-       public void setRequestFuture(CompletableFuture<Acknowledge> 
requestFuture) {
+       public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> 
requestFuture) {
                this.requestFuture = requestFuture;
        }
 
+       @Nullable
        public CompletableFuture<Acknowledge> getRequestFuture() {
                return requestFuture;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index bedf8ec..d4afdbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.UUID;
@@ -47,7 +46,6 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @param slotId slot id for the request
         * @param allocationId id for the request
         * @param resourceManagerLeaderId current leader id of the 
ResourceManager
-        * @throws SlotAllocationException if the slot allocation fails
         * @return answer to the slot request
         */
        Future<Acknowledge> requestSlot(

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a72969e..c09316c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -33,8 +34,8 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -54,13 +55,12 @@ public class SlotProtocolTest extends TestLogger {
 
        private static final long timeout = 10000L;
 
+       private static final ScheduledExecutorService scheduledExecutorService 
= 
+                       new ScheduledThreadPoolExecutor(1);
 
-       private static ScheduledExecutorService scheduledExecutorService;
 
-       @BeforeClass
-       public static void beforeClass() {
-               scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-       }
+       private static final ScheduledExecutor scheduledExecutor = 
+                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService);
 
        @AfterClass
        public static void afterClass() {
@@ -81,7 +81,7 @@ public class SlotProtocolTest extends TestLogger {
                final UUID rmLeaderID = UUID.randomUUID();
 
                try (SlotManager slotManager = new SlotManager(
-                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService),
+                       scheduledExecutor,
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
                        .thenReturn(mock(FlinkFuture.class));
 
                try (SlotManager slotManager = new SlotManager(
-                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService),
+                       scheduledExecutor,
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime())) {

Reply via email to