[FLINK-5810] [flip-6] Multiple small cleanups across Resource Manager related code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/759f46ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/759f46ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/759f46ea Branch: refs/heads/master Commit: 759f46ea6716e2f9da98002ed881be2fe6d7ab39 Parents: 59aefb5 Author: Stephan Ewen <se...@apache.org> Authored: Thu Mar 16 18:53:21 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 28 15:28:25 2017 +0200 ---------------------------------------------------------------------- .../types/ResourceIDRetrievable.java | 4 +++- .../runtime/clusterframework/types/SlotID.java | 20 ++++++-------------- .../runtime/concurrent/ScheduledExecutor.java | 2 +- .../flink/runtime/messages/Acknowledge.java | 3 ++- .../registration/TaskExecutorConnection.java | 7 ++++--- .../slotmanager/PendingSlotRequest.java | 10 +++++++++- .../taskexecutor/TaskExecutorGateway.java | 2 -- .../slotmanager/SlotProtocolTest.java | 16 ++++++++-------- 8 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java index b45d53c..e65840d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java @@ -22,6 +22,8 @@ package org.apache.flink.runtime.clusterframework.types; */ public interface ResourceIDRetrievable { + /** + * Gets the ResourceID of the object. + */ ResourceID getResourceID(); - } http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index d6409b6..83c28b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -24,7 +24,11 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Unique identifier for a slot which located in TaskManager. + * Unique identifier for a slot on a TaskManager. This ID is constant across the + * life time of the TaskManager. + * + * <p>In contrast, the {@link AllocationID} represents the a slot allocation and changes + * every time the slot is allocated by a JobManager. */ public class SlotID implements ResourceIDRetrievable, Serializable { @@ -66,10 +70,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable { SlotID slotID = (SlotID) o; - if (slotNumber != slotID.slotNumber) { - return false; - } - return resourceId.equals(slotID.resourceId); + return slotNumber == slotID.slotNumber && resourceId.equals(slotID.resourceId); } @Override @@ -83,13 +84,4 @@ public class SlotID implements ResourceIDRetrievable, Serializable { public String toString() { return resourceId + "_" + slotNumber; } - - /** - * Generate a random slot id. - * - * @return A random slot id. - */ - public static SlotID generate() { - return new SlotID(ResourceID.generate(), 0); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java index c1b47e2..d09cfc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java @@ -82,7 +82,7 @@ public interface ScheduledExecutor extends Executor { * @param delay the time between the end of the current and the start of the next execution * @param unit the time unit of the initial delay and the delay parameter * @return a ScheduledFuture representing the repeatedly executed task. This future never - * completes unless th exectuion of the given task fails or if the future is cancelled + * completes unless the execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java index 4bbc50a..2c08ec9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java @@ -62,8 +62,9 @@ public class Acknowledge implements Serializable { /** * Read resolve to preserve the singleton object property. + * (per best practices, this should have visibility 'protected') */ - private Object readResolve() throws java.io.ObjectStreamException { + protected Object readResolve() throws java.io.ObjectStreamException { return INSTANCE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java index e4522f2..babe5b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager.registration; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * This class is responsible for grouping the TaskExecutorGateway and the InstanceID * of a registered task executor. @@ -29,11 +31,11 @@ public class TaskExecutorConnection { private final InstanceID instanceID; - private TaskExecutorGateway taskExecutorGateway; + private final TaskExecutorGateway taskExecutorGateway; public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) { this.instanceID = new InstanceID(); - this.taskExecutorGateway = taskExecutorGateway; + this.taskExecutorGateway = checkNotNull(taskExecutorGateway); } public InstanceID getInstanceID() { @@ -43,5 +45,4 @@ public class TaskExecutorConnection { public TaskExecutorGateway getTaskExecutorGateway() { return taskExecutorGateway; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index 894f146..1195791 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; import java.util.UUID; import java.util.concurrent.ScheduledFuture; @@ -33,16 +34,21 @@ public class PendingSlotRequest { private final SlotRequest slotRequest; + @Nullable private CompletableFuture<Acknowledge> requestFuture; + @Nullable private UUID timeoutIdentifier; + @Nullable private ScheduledFuture<?> timeoutFuture; public PendingSlotRequest(SlotRequest slotRequest) { this.slotRequest = Preconditions.checkNotNull(slotRequest); } + // ------------------------------------------------------------------------ + public AllocationID getAllocationId() { return slotRequest.getAllocationId(); } @@ -51,6 +57,7 @@ public class PendingSlotRequest { return slotRequest.getResourceProfile(); } + @Nullable public UUID getTimeoutIdentifier() { return timeoutIdentifier; } @@ -67,10 +74,11 @@ public class PendingSlotRequest { return null != requestFuture; } - public void setRequestFuture(CompletableFuture<Acknowledge> requestFuture) { + public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> requestFuture) { this.requestFuture = requestFuture; } + @Nullable public CompletableFuture<Acknowledge> getRequestFuture() { return requestFuture; } http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index bedf8ec..d4afdbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; -import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskmanager.Task; import java.util.UUID; @@ -47,7 +46,6 @@ public interface TaskExecutorGateway extends RpcGateway { * @param slotId slot id for the request * @param allocationId id for the request * @param resourceManagerLeaderId current leader id of the ResourceManager - * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ Future<Acknowledge> requestSlot( http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index a72969e..c09316c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.resourcemanager.SlotRequest; @@ -33,8 +34,8 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -54,13 +55,12 @@ public class SlotProtocolTest extends TestLogger { private static final long timeout = 10000L; + private static final ScheduledExecutorService scheduledExecutorService = + new ScheduledThreadPoolExecutor(1); - private static ScheduledExecutorService scheduledExecutorService; - @BeforeClass - public static void beforeClass() { - scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - } + private static final ScheduledExecutor scheduledExecutor = + new ScheduledExecutorServiceAdapter(scheduledExecutorService); @AfterClass public static void afterClass() { @@ -81,7 +81,7 @@ public class SlotProtocolTest extends TestLogger { final UUID rmLeaderID = UUID.randomUUID(); try (SlotManager slotManager = new SlotManager( - new ScheduledExecutorServiceAdapter(scheduledExecutorService), + scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { @@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger { .thenReturn(mock(FlinkFuture.class)); try (SlotManager slotManager = new SlotManager( - new ScheduledExecutorServiceAdapter(scheduledExecutorService), + scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) {