[hotfix] Add methods defined in the gateway to the ResourceManager and TaskExecutor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbd38671 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbd38671 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbd38671 Branch: refs/heads/flip-6 Commit: fbd38671f33d2579cf2856c7f9b50ffc0335113f Parents: b2c5c0f Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Sep 21 14:14:05 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:40 2016 +0200 ---------------------------------------------------------------------- .../runtime/resourcemanager/ResourceManager.java | 1 + .../resourcemanager/ResourceManagerGateway.java | 2 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 15 +++++++++++++++ .../runtime/taskexecutor/TaskExecutorGateway.java | 6 +++--- 4 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 29aba1a..d9a7134 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.resourcemanager; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import org.apache.flink.annotation.VisibleForTesting; http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index e5c8b64..c8e3488 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway { * @param slotRequest Slot request * @return Future slot assignment */ - Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest); + Future<SlotRequestReply> requestSlot(SlotRequest slotRequest); /** * http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index a455fe2..fadae5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -30,6 +31,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -201,6 +204,18 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } /** + * Requests a slot from the TaskManager + * + * @param allocationID id for the request + * @param resourceManagerLeaderID current leader id of the ResourceManager + * @return answer to the slot request + */ + @RpcMethod + public SlotRequestReply requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID) { + return new SlotRequestRegistered(allocationID); + } + + /** * Starts and runs the TaskManager. * <p/> * This method first tries to select the network interface to use for the TaskManager http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 7257436..65323a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -39,12 +39,12 @@ public interface TaskExecutorGateway extends RpcGateway { void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); /** - * Send by the ResourceManager to the TaskExecutor + * Requests a slot from the TaskManager + * * @param allocationID id for the request * @param resourceManagerLeaderID current leader id of the ResourceManager - * @return SlotRequestReply Answer to the request + * @return answer to the slot request */ - Future<SlotRequestReply> requestSlot( AllocationID allocationID, UUID resourceManagerLeaderID,