[hotfix] Add methods defined in the gateway to the ResourceManager and 
TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbd38671
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbd38671
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbd38671

Branch: refs/heads/flip-6
Commit: fbd38671f33d2579cf2856c7f9b50ffc0335113f
Parents: b2c5c0f
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 21 14:14:05 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200

----------------------------------------------------------------------
 .../runtime/resourcemanager/ResourceManager.java     |  1 +
 .../resourcemanager/ResourceManagerGateway.java      |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java     | 15 +++++++++++++++
 .../runtime/taskexecutor/TaskExecutorGateway.java    |  6 +++---
 4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 29aba1a..d9a7134 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 
 import org.apache.flink.annotation.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e5c8b64..c8e3488 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param slotRequest Slot request
         * @return Future slot assignment
         */
-       Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
+       Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
        /**
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a455fe2..fadae5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -30,6 +31,8 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -201,6 +204,18 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        }
 
        /**
+        * Requests a slot from the TaskManager
+        *
+        * @param allocationID id for the request
+        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
+        * @return answer to the slot request
+        */
+       @RpcMethod
+       public SlotRequestReply requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID) {
+               return new SlotRequestRegistered(allocationID);
+       }
+
+       /**
         * Starts and runs the TaskManager.
         * <p/>
         * This method first tries to select the network interface to use for 
the TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/fbd38671/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 7257436..65323a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -39,12 +39,12 @@ public interface TaskExecutorGateway extends RpcGateway {
        void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
 
        /**
-        * Send by the ResourceManager to the TaskExecutor
+        * Requests a slot from the TaskManager
+        *
         * @param allocationID id for the request
         * @param resourceManagerLeaderID current leader id of the 
ResourceManager
-        * @return SlotRequestReply Answer to the request
+        * @return answer to the slot request
         */
-
        Future<SlotRequestReply> requestSlot(
                AllocationID allocationID,
                UUID resourceManagerLeaderID,

Reply via email to