[FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to 
communicate with the TaskManager.

Replaces AkkaUtils.globalExecutionContext with instance dependent 
ExecutionContext.

This closes #893


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ccb5fdb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ccb5fdb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ccb5fdb

Branch: refs/heads/master
Commit: 2ccb5fdb47aa0e3766fd7fbd17a41feaca29fcbc
Parents: aa5e5b3
Author: Till Rohrmann <[email protected]>
Authored: Tue Jul 7 11:41:44 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 17:54:31 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  89 ++-
 .../runtime/executiongraph/ExecutionGraph.java  |  55 +-
 .../runtime/executiongraph/ExecutionVertex.java |  40 +-
 .../runtime/instance/AkkaInstanceGateway.java   | 111 ++++
 .../apache/flink/runtime/instance/Instance.java |  34 +-
 .../flink/runtime/instance/InstanceGateway.java |  82 +++
 .../flink/runtime/instance/InstanceManager.java |   3 +-
 .../runtime/io/network/NetworkEnvironment.java  |  34 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  11 +-
 .../jobmanager/web/SetupInfoServlet.java        |   3 +-
 .../web-docs-infoserver/js/taskmanager.js       |   2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   2 -
 .../flink/runtime/jobmanager/JobManager.scala   |  95 +++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  20 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  34 +-
 .../executiongraph/AllVerticesIteratorTest.java |   2 +
 .../ExecutionGraphConstructionTest.java         |  54 +-
 .../ExecutionGraphDeploymentTest.java           |  59 +-
 .../executiongraph/ExecutionGraphTestUtils.java | 121 ++--
 .../ExecutionStateProgressTest.java             |  31 +-
 .../ExecutionVertexCancelTest.java              | 563 ++++++++-----------
 .../ExecutionVertexDeploymentTest.java          | 115 +---
 .../ExecutionVertexSchedulingTest.java          |  35 +-
 .../executiongraph/LocalInputSplitsTest.java    |  53 +-
 .../executiongraph/PointwisePatternTest.java    |  50 +-
 .../TerminalStateDeadlockTest.java              |  10 +-
 .../VertexLocationConstraintTest.java           |  91 +--
 .../executiongraph/VertexSlotSharingTest.java   |   7 +-
 .../instance/BaseTestingInstanceGateway.java    |  94 ++++
 .../runtime/instance/DummyInstanceGateway.java  |  57 ++
 .../flink/runtime/instance/InstanceTest.java    |   7 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   4 +-
 .../io/network/NetworkEnvironmentTest.java      |   6 +-
 .../ScheduleWithCoLocationHintTest.java         |  37 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  31 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  51 +-
 .../scheduler/SchedulerTestUtils.java           |   4 +-
 ...askManagerComponentsStartupShutdownTest.java |   6 +-
 .../ExecutionGraphRestartTest.scala             |  40 +-
 .../TaskManagerLossFailsTasksTest.scala         |  33 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   8 +-
 .../runtime/testingUtils/TestingCluster.scala   |  30 +-
 .../testingUtils/TestingJobManager.scala        |  15 +-
 .../TestingJobManagerMessages.scala             |   3 +-
 .../runtime/testingUtils/TestingUtils.scala     |  62 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   6 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  36 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  12 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  30 +-
 .../flink/yarn/ApplicationMasterActor.scala     |   2 +-
 50 files changed, 1399 insertions(+), 981 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 76a58e8..af67c3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,13 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -32,6 +28,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -50,6 +47,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -131,9 +129,20 @@ public class Execution implements Serializable {
        
        private SerializedValue<StateHandle<?>> operatorState;
 
+       /** The execution context which is used to execute futures. */
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       private ExecutionContext executionContext;
+
        // 
--------------------------------------------------------------------------------------------
        
-       public Execution(ExecutionVertex vertex, int attemptNumber, long 
startTimestamp, FiniteDuration timeout) {
+       public Execution(
+                       ExecutionContext executionContext,
+                       ExecutionVertex vertex,
+                       int attemptNumber,
+                       long startTimestamp,
+                       FiniteDuration timeout) {
+               this.executionContext = checkNotNull(executionContext);
+
                this.vertex = checkNotNull(vertex);
                this.attemptId = new ExecutionAttemptID();
                
@@ -200,6 +209,8 @@ public class Execution implements Serializable {
                }
                assignedResource = null;
 
+               executionContext = null;
+
                partialInputChannelDeploymentDescriptors.clear();
                partialInputChannelDeploymentDescriptors = null;
        }
@@ -338,8 +349,9 @@ public class Execution implements Serializable {
                        vertex.getExecutionGraph().registerExecution(this);
 
                        final Instance instance = slot.getInstance();
-                       final Future<Object> deployAction = 
Patterns.ask(instance.getTaskManager(),
-                                       new SubmitTask(deployment), new 
Timeout(timeout));
+                       final InstanceGateway gateway = 
instance.getInstanceGateway();
+
+                       final Future<Object> deployAction = gateway.ask(new 
SubmitTask(deployment), timeout);
 
                        deployAction.onComplete(new OnComplete<Object>(){
 
@@ -366,7 +378,7 @@ public class Execution implements Serializable {
                                                }
                                        }
                                }
-                       }, AkkaUtils.globalExecutionContext());
+                       }, executionContext);
                }
                catch (Throwable t) {
                        markFailed(t);
@@ -402,7 +414,7 @@ public class Execution implements Serializable {
                        else if (current == FINISHED || current == FAILED) {
                                // nothing to do any more. finished failed 
before it could be cancelled.
                                // in any case, the task is removed from the 
TaskManager already
-                               sendFailIntermediateResultPartitionsRPCCall();
+                               sendFailIntermediateResultPartitionsRpcCall();
 
                                return;
                        }
@@ -485,7 +497,7 @@ public class Execution implements Serializable {
 
                                                return true;
                                        }
-                               }, AkkaUtils.globalExecutionContext());
+                               }, executionContext);
 
                                // double check to resolve race conditions
                                if(consumerVertex.getExecutionState() == 
RUNNING){
@@ -533,7 +545,7 @@ public class Execution implements Serializable {
                                        final UpdatePartitionInfo 
updateTaskMessage = new UpdateTaskSinglePartitionInfo(
                                                        
consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
 
-                                       sendUpdateTaskRpcCall(consumerSlot, 
updateTaskMessage);
+                                       
sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
                                }
                                // 
----------------------------------------------------------------
                                // Consumer is scheduled or deploying => cache 
input channel
@@ -689,11 +701,12 @@ public class Execution implements Serializable {
                                
inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
                        }
 
-                       UpdatePartitionInfo updateTaskMessage =
-                                       
createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
-                                                       
inputChannelDeploymentDescriptors);
+                       UpdatePartitionInfo updateTaskMessage = 
createUpdateTaskMultiplePartitionInfos(
+                               attemptId,
+                               resultIDs,
+                               inputChannelDeploymentDescriptors);
 
-                       sendUpdateTaskRpcCall(assignedResource, 
updateTaskMessage);
+                       sendUpdatePartitionInfoRpcCall(assignedResource, 
updateTaskMessage);
                }
        }
 
@@ -804,14 +817,23 @@ public class Execution implements Serializable {
                }
        }
 
+       /**
+        * This method sends a CancelTask message to the instance of the 
assigned slot.
+        *
+        * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
+        */
        private void sendCancelRpcCall() {
                final SimpleSlot slot = this.assignedResource;
 
                if (slot != null) {
 
-                       Future<Object> cancelResult = 
AkkaUtils.retry(slot.getInstance().getTaskManager(), new
-                                                       CancelTask(attemptId), 
NUM_CANCEL_CALL_TRIES,
-                                       AkkaUtils.globalExecutionContext(), 
timeout);
+                       final InstanceGateway gateway = 
slot.getInstance().getInstanceGateway();
+
+                       Future<Object> cancelResult = gateway.retry(
+                               new CancelTask(attemptId),
+                               NUM_CANCEL_CALL_TRIES,
+                               timeout,
+                               executionContext);
 
                        cancelResult.onComplete(new OnComplete<Object>() {
 
@@ -827,35 +849,40 @@ public class Execution implements Serializable {
                                                }
                                        }
                                }
-                       }, AkkaUtils.globalExecutionContext());
+                       }, executionContext);
                }
        }
 
-       private void sendFailIntermediateResultPartitionsRPCCall() {
+       private void sendFailIntermediateResultPartitionsRpcCall() {
                final SimpleSlot slot = this.assignedResource;
 
                if (slot != null) {
                        final Instance instance = slot.getInstance();
 
                        if (instance.isAlive()) {
-                               try {
-                                       // TODO For some tests this could be a 
problem when querying too early if all resources were released
-                                       instance.getTaskManager().tell(new 
FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
-                               } catch (Throwable t) {
-                                       fail(new Exception("Intermediate result 
partition could not be failed.", t));
-                               }
+                               final InstanceGateway gateway = 
instance.getInstanceGateway();
+
+                               // TODO For some tests this could be a problem 
when querying too early if all resources were released
+                               gateway.tell(new 
FailIntermediateResultPartitions(attemptId));
                        }
                }
        }
 
-       private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
-                                                                               
final UpdatePartitionInfo updateTaskMsg) {
+       /**
+        * Sends an UpdatePartitionInfo message to the instance of the 
consumerSlot.
+        *
+        * @param consumerSlot Slot to whose instance the message will be sent
+        * @param updatePartitionInfo UpdatePartitionInfo message
+        */
+       private void sendUpdatePartitionInfoRpcCall(
+                       final SimpleSlot consumerSlot,
+                       final UpdatePartitionInfo updatePartitionInfo) {
 
                if (consumerSlot != null) {
                        final Instance instance = consumerSlot.getInstance();
+                       final InstanceGateway gateway = 
instance.getInstanceGateway();
 
-                       Future<Object> futureUpdate = 
Patterns.ask(instance.getTaskManager(), updateTaskMsg,
-                                       new Timeout(timeout));
+                       Future<Object> futureUpdate = 
gateway.ask(updatePartitionInfo, timeout);
 
                        futureUpdate.onFailure(new OnFailure() {
                                @Override
@@ -863,7 +890,7 @@ public class Execution implements Serializable {
                                        fail(new IllegalStateException("Update 
task on instance " + instance +
                                                        " failed due to:", 
failure));
                                }
-                       }, AkkaUtils.globalExecutionContext());
+                       }, executionContext);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 84cbab7..47b7ae2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -24,7 +24,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -45,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -197,6 +197,10 @@ public class ExecutionGraph implements Serializable {
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointCoordinator checkpointCoordinator;
 
+       /** The execution context which is used to execute futures. */
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       private ExecutionContext executionContext;
+
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private ExecutionConfig executionConfig;
 
@@ -207,17 +211,38 @@ public class ExecutionGraph implements Serializable {
        /**
         * This constructor is for tests only, because it does not include 
class loading information.
         */
-       ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, 
FiniteDuration timeout) {
-               this(jobId, jobName, jobConfig, timeout, new 
ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
-       }
-
-       public ExecutionGraph(JobID jobId, String jobName, Configuration 
jobConfig, FiniteDuration timeout,
-                       List<BlobKey> requiredJarFiles, ClassLoader 
userClassLoader) {
-
-               if (jobId == null || jobName == null || jobConfig == null || 
userClassLoader == null) {
+       ExecutionGraph(
+                       ExecutionContext executionContext,
+                       JobID jobId,
+                       String jobName,
+                       Configuration jobConfig,
+                       FiniteDuration timeout) {
+               this(
+                       executionContext,
+                       jobId,
+                       jobName,
+                       jobConfig,
+                       timeout,
+                       new ArrayList<BlobKey>(),
+                       ExecutionGraph.class.getClassLoader()
+               );
+       }
+
+       public ExecutionGraph(
+                       ExecutionContext executionContext,
+                       JobID jobId,
+                       String jobName,
+                       Configuration jobConfig,
+                       FiniteDuration timeout,
+                       List<BlobKey> requiredJarFiles,
+                       ClassLoader userClassLoader) {
+
+               if (executionContext == null || jobId == null || jobName == 
null || jobConfig == null || userClassLoader == null) {
                        throw new NullPointerException();
                }
 
+               this.executionContext = executionContext;
+
                this.jobID = jobId;
                this.jobName = jobName;
                this.jobConfiguration = jobConfig;
@@ -451,6 +476,15 @@ public class ExecutionGraph implements Serializable {
                return this.stateTimestamps[status.ordinal()];
        }
 
+       /**
+        * Returns the ExecutionContext associated with this ExecutionGraph.
+        *
+        * @return ExecutionContext associated with this ExecutionGraph
+        */
+       public ExecutionContext getExecutionContext() {
+               return executionContext;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
@@ -629,6 +663,7 @@ public class ExecutionGraph implements Serializable {
                userClassLoader = null;
                scheduler = null;
                checkpointCoordinator = null;
+               executionContext = null;
 
                for (ExecutionJobVertex vertex : verticesInCreationOrder) {
                        vertex.prepareForArchiving();
@@ -719,7 +754,7 @@ public class ExecutionGraph implements Serializable {
                                                                        
restart();
                                                                        return 
null;
                                                                }
-                                                       }, 
AkkaUtils.globalExecutionContext());
+                                                       }, executionContext);
                                                        break;
                                                }
                                                else if (numberOfRetriesLeft <= 
0 && transitionState(current, JobStatus.FAILED, failureCause)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a70fa7d..f9001cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -30,6 +28,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -101,14 +100,20 @@ public class ExecutionVertex implements Serializable {
 
        // 
--------------------------------------------------------------------------------------------
 
-       public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
-                                               IntermediateResult[] 
producedDataSets, FiniteDuration timeout) {
+       public ExecutionVertex(
+                       ExecutionJobVertex jobVertex,
+                       int subTaskIndex,
+                       IntermediateResult[] producedDataSets,
+                       FiniteDuration timeout) {
                this(jobVertex, subTaskIndex, producedDataSets, timeout, 
System.currentTimeMillis());
        }
 
-       public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
-                                               IntermediateResult[] 
producedDataSets, FiniteDuration timeout,
-                                               long createTimestamp) {
+       public ExecutionVertex(
+                       ExecutionJobVertex jobVertex,
+                       int subTaskIndex,
+                       IntermediateResult[] producedDataSets,
+                       FiniteDuration timeout,
+                       long createTimestamp) {
                this.jobVertex = jobVertex;
                this.subTaskIndex = subTaskIndex;
 
@@ -125,7 +130,12 @@ public class ExecutionVertex implements Serializable {
 
                this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
-               this.currentExecution = new Execution(this, 0, createTimestamp, 
timeout);
+               this.currentExecution = new Execution(
+                       getExecutionGraph().getExecutionContext(),
+                       this,
+                       0,
+                       createTimestamp,
+                       timeout);
 
                // create a co-location scheduling hint, if necessary
                CoLocationGroup clg = jobVertex.getCoLocationGroup();
@@ -416,8 +426,12 @@ public class ExecutionVertex implements Serializable {
 
                        if (state == FINISHED || state == CANCELED || state == 
FAILED) {
                                priorExecutions.add(execution);
-                               currentExecution = new Execution(this, 
execution.getAttemptNumber()+1,
-                                               System.currentTimeMillis(), 
timeout);
+                               currentExecution = new Execution(
+                                       
getExecutionGraph().getExecutionContext(),
+                                       this,
+                                       execution.getAttemptNumber()+1,
+                                       System.currentTimeMillis(),
+                                       timeout);
 
                                CoLocationGroup grp = 
jobVertex.getCoLocationGroup();
                                if (grp != null) {
@@ -455,9 +469,9 @@ public class ExecutionVertex implements Serializable {
                        
                        // send only if we actually have a target
                        if (slot != null) {
-                               ActorRef taskManager = 
slot.getInstance().getTaskManager();
-                               if (taskManager != null) {
-                                       taskManager.tell(message, 
ActorRef.noSender());
+                               InstanceGateway gateway = 
slot.getInstance().getInstanceGateway();
+                               if (gateway != null) {
+                                       gateway.tell(message);
                                }
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
new file mode 100644
index 0000000..b7d60c5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * InstanceGateway implementation which uses Akka to communicate with remote 
instances.
+ */
+public class AkkaInstanceGateway implements InstanceGateway {
+
+       /** ActorRef of the remote instance */
+       private final ActorRef taskManager;
+
+       public AkkaInstanceGateway(ActorRef taskManager) {
+               this.taskManager = taskManager;
+       }
+
+       /**
+        * Sends a message asynchronously and returns its response. The 
response to the message is
+        * returned as a future.
+        *
+        * @param message Message to be sent
+        * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
+        * @return Future which contains the response to the sent message
+        */
+       @Override
+       public Future<Object> ask(Object message, FiniteDuration timeout) {
+               return Patterns.ask(taskManager, message, new Timeout(timeout));
+       }
+
+       /**
+        * Sends a message asynchronously without a result.
+        *
+        * @param message Message to be sent
+        */
+       @Override
+       public void tell(Object message) {
+               taskManager.tell(message, ActorRef.noSender());
+       }
+
+       /**
+        * Forwards a message. For the receiver of this message it looks as if 
sender has sent the
+        * message.
+        *
+        * @param message Message to be sent
+        * @param sender Sender of the forwarded message
+        */
+       @Override
+       public void forward(Object message, ActorRef sender) {
+               taskManager.tell(message, sender);
+       }
+
+       /**
+        * Retries to send asynchronously a message up to numberRetries times. 
The response to this
+        * message is returned as a future. The message is re-sent if the 
number of retries is not yet
+        * exceeded and if an exception occurred while sending it.
+        *
+        * @param message Message to be sent
+        * @param numberRetries Number of times to retry sending the message
+        * @param timeout Timeout for each sending attempt
+        * @param executionContext ExecutionContext which is used to send the 
message multiple times
+        * @return Future of the response to the sent message
+        */
+       @Override
+       public Future<Object> retry(
+                       Object message,
+                       int numberRetries,
+                       FiniteDuration timeout,
+                       ExecutionContext executionContext) {
+
+               return AkkaUtils.retry(
+                       taskManager,
+                       message,
+                       numberRetries,
+                       executionContext,
+                       timeout);
+       }
+
+       /**
+        * Returns the ActorPath of the remote instance.
+        *
+        * @return ActorPath of the remote instance.
+        */
+       @Override
+       public String path() {
+               return taskManager.path().toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 39caf08..1c44b5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.slf4j.Logger;
@@ -43,8 +41,8 @@ public class Instance {
        /** The lock on which to synchronize allocations and failure state 
changes */
        private final Object instanceLock = new Object();
 
-       /** The actor ref to the task manager represented by this taskManager. 
*/
-       private final ActorRef taskManager;
+       /** The instacne gateway to communicate with the instance */
+       private final InstanceGateway instanceGateway;
 
        /** The instance connection information for the data transfer. */
        private final InstanceConnectionInfo connectionInfo;
@@ -81,15 +79,19 @@ public class Instance {
        /**
         * Constructs an instance reflecting a registered TaskManager.
         *
-        * @param taskManager The actor reference of the represented task 
manager.
+        * @param instanceGateway The instance gateway to communicate with the 
remote instance
         * @param connectionInfo The remote connection where the task manager 
receives requests.
         * @param id The id under which the taskManager is registered.
         * @param resources The resources available on the machine.
         * @param numberOfSlots The number of task slots offered by this 
taskManager.
         */
-       public Instance(ActorRef taskManager, InstanceConnectionInfo 
connectionInfo, InstanceID id,
-                                       HardwareDescription resources, int 
numberOfSlots) {
-               this.taskManager = taskManager;
+       public Instance(
+                       InstanceGateway instanceGateway,
+                       InstanceConnectionInfo connectionInfo,
+                       InstanceID id,
+                       HardwareDescription resources,
+                       int numberOfSlots) {
+               this.instanceGateway = instanceGateway;
                this.connectionInfo = connectionInfo;
                this.instanceId = id;
                this.resources = resources;
@@ -327,12 +329,14 @@ public class Instance {
                }
        }
 
-       public ActorRef getTaskManager() {
-               return taskManager;
-       }
-
-       public String getPath(){
-               return taskManager.path().toString();
+       /**
+        * Returns the InstanceGateway of this Instance. This gateway can be 
used to communicate with
+        * it.
+        *
+        * @return InstanceGateway associated with this instance
+        */
+       public InstanceGateway getInstanceGateway() {
+               return instanceGateway;
        }
 
        public InstanceConnectionInfo getInstanceConnectionInfo() {
@@ -386,6 +390,6 @@ public class Instance {
        @Override
        public String toString() {
                return String.format("%s @ %s - %d slots - URL: %s", 
instanceId, connectionInfo.getHostname(),
-                               numberOfSlots, (taskManager != null ? 
taskManager.path() : "ActorRef.noSender"));
+                               numberOfSlots, (instanceGateway != null ? 
instanceGateway.path() : "No instance gateway"));
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
new file mode 100644
index 0000000..a30b2f6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Interface to abstract the communication with an Instance.
+ *
+ * It allows to avoid direct interaction with an ActorRef.
+ */
+public interface InstanceGateway {
+
+       /**
+        * Sends a message asynchronously and returns its response. The 
response to the message is
+        * returned as a future.
+        *
+        * @param message Message to be sent
+        * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
+        * @return Future which contains the response to the sent message
+        */
+       Future<Object> ask(Object message, FiniteDuration timeout);
+
+       /**
+        * Sends a message asynchronously without a result.
+        *
+        * @param message Message to be sent
+        */
+       void tell(Object message);
+
+       /**
+        * Forwards a message. For the receiver of this message it looks as if 
sender has sent the
+        * message.
+        *
+        * @param message Message to be sent
+        * @param sender Sender of the forwarded message
+        */
+       void forward(Object message, ActorRef sender);
+
+       /**
+        * Retries to send asynchronously a message up to numberRetries times. 
The response to this
+        * message is returned as a future. The message is re-sent if the 
number of retries is not yet
+        * exceeded and if an exception occurred while sending it.
+        *
+        * @param message Message to be sent
+        * @param numberRetries Number of times to retry sending the message
+        * @param timeout Timeout for each sending attempt
+        * @param executionContext ExecutionContext which is used to send the 
message multiple times
+        * @return Future of the response to the sent message
+        */
+       Future<Object> retry(
+                       Object message,
+                       int numberRetries,
+                       FiniteDuration timeout,
+                       ExecutionContext executionContext);
+
+       /**
+        * Returns the path of the remote instance.
+        *
+        * @return Path of the remote instance.
+        */
+       String path();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index c1800bd..4f6c7a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -149,8 +149,9 @@ public class InstanceManager {
                                id = new InstanceID();
                        } while (registeredHostsById.containsKey(id));
 
+                       InstanceGateway instanceGateway = new 
AkkaInstanceGateway(taskManager);
 
-                       Instance host = new Instance(taskManager, 
connectionInfo, id, resources, numberOfSlots);
+                       Instance host = new Instance(instanceGateway, 
connectionInfo, id, resources, numberOfSlots);
 
                        registeredHostsById.put(id, host);
                        registeredHostsByConnection.put(taskManager, host);

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index c082c6a..0ffc889 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -23,7 +23,6 @@ import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -89,11 +89,20 @@ public class NetworkEnvironment {
        private boolean isShutdown;
 
        /**
+        * ExecutionEnvironment which is used to execute remote calls with the
+        * {@link JobManagerResultPartitionConsumableNotifier}
+        */
+       private final ExecutionContext executionContext;
+
+       /**
         * Initializes all network I/O components.
         */
-       public NetworkEnvironment(FiniteDuration jobManagerTimeout,
-                                                               
NetworkEnvironmentConfiguration config) throws IOException {
+       public NetworkEnvironment(
+               ExecutionContext executionContext,
+               FiniteDuration jobManagerTimeout,
+               NetworkEnvironmentConfiguration config) throws IOException {
 
+               this.executionContext = executionContext;
                this.configuration = checkNotNull(config);
                this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
 
@@ -182,7 +191,10 @@ public class NetworkEnvironment {
                                this.partitionManager = new 
ResultPartitionManager();
                                this.taskEventDispatcher = new 
TaskEventDispatcher();
                                this.partitionConsumableNotifier = new 
JobManagerResultPartitionConsumableNotifier(
-                                                                               
                        jobManagerRef, taskManagerRef, new 
Timeout(jobManagerTimeout));
+                                       executionContext,
+                                       jobManagerRef,
+                                       taskManagerRef,
+                                       new Timeout(jobManagerTimeout));
 
                                this.partitionStateChecker = new 
JobManagerPartitionStateChecker(
                                                jobManagerRef, taskManagerRef);
@@ -414,6 +426,12 @@ public class NetworkEnvironment {
         */
        private static class JobManagerResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
 
+               /**
+                * {@link ExecutionContext} which is used for the failure 
handler of {@link ScheduleOrUpdateConsumers}
+                * messages.
+                */
+               private final ExecutionContext executionContext;
+
                private final ActorRef jobManager;
 
                private final ActorRef taskManager;
@@ -421,8 +439,12 @@ public class NetworkEnvironment {
                private final Timeout jobManagerMessageTimeout;
 
                public JobManagerResultPartitionConsumableNotifier(
-                               ActorRef jobManager, ActorRef taskManager, 
Timeout jobManagerMessageTimeout) {
+                       ExecutionContext executionContext,
+                       ActorRef jobManager,
+                       ActorRef taskManager,
+                       Timeout jobManagerMessageTimeout) {
 
+                       this.executionContext = executionContext;
                        this.jobManager = jobManager;
                        this.taskManager = taskManager;
                        this.jobManagerMessageTimeout = 
jobManagerMessageTimeout;
@@ -448,7 +470,7 @@ public class NetworkEnvironment {
 
                                        taskManager.tell(failMsg, 
ActorRef.noSender());
                                }
-                       }, AkkaUtils.globalExecutionContext());
+                       }, executionContext);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 940082e..cb99e52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -37,7 +37,6 @@ import akka.dispatch.Futures;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -50,6 +49,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among 
instances and slots.
@@ -95,12 +95,17 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        /** The number of slot allocations where locality could not be 
respected */
        private int nonLocalizedAssignments;
 
+       /** The ExecutionContext which is used to execute newSlotAvailable 
futures. */
+       private final ExecutionContext executionContext;
+
        // 
------------------------------------------------------------------------
 
        /**
         * Creates a new scheduler.
         */
-       public Scheduler() {}
+       public Scheduler(ExecutionContext executionContext) {
+               this.executionContext = executionContext;
+       }
        
        /**
         * Shuts the scheduler down. After shut down no more tasks can be added 
to the scheduler.
@@ -519,7 +524,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                handleNewSlot();
                                return null;
                        }
-               }, AkkaUtils.globalExecutionContext());
+               }, executionContext);
        }
        
        private void handleNewSlot() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 4e028d4..567d15a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -146,8 +146,7 @@ public class SetupInfoServlet extends HttpServlet {
                                long time = new Date().getTime() - 
instance.getLastHeartBeat();
 
                                try {
-                                       objInner.put("inetAdress", 
instance.getInstanceConnectionInfo().getInetAdress());
-                                       objInner.put("ipcPort", 
instance.getTaskManager().path().address().hostPort());
+                                       objInner.put("path", 
instance.getInstanceGateway().path());
                                        objInner.put("dataPort", 
instance.getInstanceConnectionInfo().dataPort());
                                        objInner.put("timeSinceLastHeartbeat", 
time / 1000);
                                        objInner.put("slotsNumber", 
instance.getTotalNumberOfSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js 
b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
index 1ea9a41..68e4278 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
@@ -224,7 +224,7 @@ function processTMdata(json) {
                                "</div>";
 
             var content = "<tr id=\""+tmRowIdCssName+"\">" +
-                               "<td style=\"width:20%\">"+tm.inetAdress+" <br> 
IPC Port: "+tm.ipcPort+", Data Port: "+tm.dataPort+"</td>" + // first row: 
TaskManager
+                               "<td style=\"width:20%\">"+tm.path+" <br> Data 
Port: "+tm.dataPort+"</td>" + // first row: TaskManager
                                "<td 
id=\""+tmRowIdCssName+"-memory\">"+tmMemoryBox+"</td>" + // second row: memory 
statistics
                                "<td id=\""+tmRowIdCssName+"-info\"><i>Loading 
Information</i></td>" + // Information
                                "</tr>";

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7ffaddd..d38e503 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -41,8 +41,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT = 21474835 seconds
 
-  var globalExecutionContext: ExecutionContext = ExecutionContext.global
-
   /**
    * Creates a local actor system without remoting.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index dc1599a..3b4ce15 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -60,6 +60,7 @@ import org.apache.flink.util.{ExceptionUtils, 
InstantiationUtil}
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**
@@ -89,16 +90,18 @@ import scala.language.postfixOps
  * - [[JobStatusChanged]] indicates that the status of job (RUNNING, 
CANCELING, FINISHED, etc.) has
  * changed. This message is sent by the ExecutionGraph.
  */
-class JobManager(protected val flinkConfiguration: Configuration,
-                 protected val instanceManager: InstanceManager,
-                 protected val scheduler: FlinkScheduler,
-                 protected val libraryCacheManager: BlobLibraryCacheManager,
-                 protected val archive: ActorRef,
-                 protected val accumulatorManager: AccumulatorManager,
-                 protected val defaultExecutionRetries: Int,
-                 protected val delayBetweenRetries: Long,
-                 protected val timeout: FiniteDuration,
-                 protected val mode: StreamingMode)
+class JobManager(
+    protected val flinkConfiguration: Configuration,
+    protected val executionContext: ExecutionContext,
+    protected val instanceManager: InstanceManager,
+    protected val scheduler: FlinkScheduler,
+    protected val libraryCacheManager: BlobLibraryCacheManager,
+    protected val archive: ActorRef,
+    protected val accumulatorManager: AccumulatorManager,
+    protected val defaultExecutionRetries: Int,
+    protected val delayBetweenRetries: Long,
+    protected val timeout: FiniteDuration,
+    protected val mode: StreamingMode)
   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
@@ -117,7 +120,7 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
-      _.getTaskManager ! Disconnect("JobManager is shutting down")
+      _.getInstanceGateway().tell(Disconnect("JobManager is shutting down"))
     }
 
     archive ! PoisonPill
@@ -136,7 +139,6 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
     }
 
     log.debug(s"Job manager ${self.path} is completely stopped.")
-
   }
 
   /**
@@ -411,8 +413,8 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 
     case RequestStackTrace(instanceID) =>
-      val taskManager = 
instanceManager.getRegisteredInstanceById(instanceID).getTaskManager
-      taskManager forward SendStackTrace
+      val gateway = 
instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
+      gateway.forward(SendStackTrace, sender)
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
@@ -480,10 +482,18 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
         }
 
         // see if there already exists an ExecutionGraph for the corresponding 
job ID
-        executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
-          (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-            jobGraph.getJobConfiguration, timeout, 
jobGraph.getUserJarBlobKeys, userCodeLoader),
-            JobInfo(sender(), System.currentTimeMillis())))._1
+        executionGraph = currentJobs.getOrElseUpdate(
+          jobGraph.getJobID,
+          (new ExecutionGraph(
+            executionContext,
+            jobGraph.getJobID,
+            jobGraph.getName,
+            jobGraph.getJobConfiguration,
+            timeout,
+            jobGraph.getUserJarBlobKeys,
+            userCodeLoader),
+            JobInfo(sender(), System.currentTimeMillis()))
+        )._1
 
         // configure the execution graph
         val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
@@ -1046,8 +1056,8 @@ object JobManager {
    * @param configuration The configuration from which to parse the config 
values.
    * @return The members for a default JobManager.
    */
-  def createJobManagerComponents(configuration: Configuration) :
-    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+  def createJobManagerComponents(configuration: Configuration)
+    : (ExecutionContext, InstanceManager, FlinkScheduler, 
BlobLibraryCacheManager,
       Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1083,6 +1093,8 @@ object JobManager {
 
     val accumulatorManager: AccumulatorManager = new 
AccumulatorManager(Math.min(1, archiveCount))
 
+    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
@@ -1091,7 +1103,7 @@ object JobManager {
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler()
+      scheduler = new FlinkScheduler(executionContext)
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -1114,8 +1126,16 @@ object JobManager {
       }
     }
 
-    (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
-      executionRetries, delayBetweenRetries, timeout, archiveCount)
+    (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiveProps,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      archiveCount)
   }
 
   /**
@@ -1154,9 +1174,16 @@ object JobManager {
                             archiverActorName: Option[String],
                             streamingMode: StreamingMode): (ActorRef, 
ActorRef) = {
 
-    val (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
-      executionRetries, delayBetweenRetries,
-      timeout, _) = createJobManagerComponents(configuration)
+    val (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiveProps,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      _) = createJobManagerComponents(configuration)
 
     // start the archiver with the given name, or without (avoid name 
conflicts)
     val archiver: ActorRef = archiverActorName match {
@@ -1164,9 +1191,19 @@ object JobManager {
       case None => actorSystem.actorOf(archiveProps)
     }
 
-    val jobManagerProps = Props(classOf[JobManager], configuration, 
instanceManager, scheduler,
-        libraryCacheManager, archiver, accumulatorManager, executionRetries,
-        delayBetweenRetries, timeout, streamingMode)
+    val jobManagerProps = Props(
+      classOf[JobManager],
+      configuration,
+      executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiver,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      streamingMode)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 73a37de..49c701e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{ExecutionContext, Future, Await}
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -48,9 +48,10 @@ import scala.concurrent.{Future, Await}
  * @param streamingMode True, if the system should be started in streaming 
mode, false if
  *                      in pure batch mode.
  */
-abstract class FlinkMiniCluster(val userConfiguration: Configuration,
-                                val singleActorSystem: Boolean,
-                                val streamingMode: StreamingMode) {
+abstract class FlinkMiniCluster(
+    val userConfiguration: Configuration,
+    val singleActorSystem: Boolean,
+    val streamingMode: StreamingMode) {
 
   def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
          = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
@@ -157,7 +158,7 @@ abstract class FlinkMiniCluster(val userConfiguration: 
Configuration,
 
     val future = gracefulStop(jobManagerActor, timeout)
 
-    implicit val executionContext = AkkaUtils.globalExecutionContext
+    implicit val executionContext = ExecutionContext.global
 
     Await.ready(Future.sequence(future +: futures), timeout)
 
@@ -179,7 +180,7 @@ abstract class FlinkMiniCluster(val userConfiguration: 
Configuration,
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {
-    implicit val executionContext = AkkaUtils.globalExecutionContext
+    implicit val executionContext = ExecutionContext.global
 
     val futures = taskManagerActors map {
       taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
@@ -196,8 +197,11 @@ abstract class FlinkMiniCluster(val userConfiguration: 
Configuration,
   }
   
   @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: 
FiniteDuration)
-                                                                 : 
SerializedJobExecutionResult = {
+  def submitJobAndWait(
+      jobGraph: JobGraph,
+      printUpdates: Boolean,
+      timeout: FiniteDuration)
+    : SerializedJobExecutionResult = {
 
     val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
     else JobClient.startJobClientActorSystem(configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 520decd..44a0b04 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.util.{ZooKeeperUtil, 
MathUtils, EnvironmentInfor
 
 import scala.concurrent._
 import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success}
 import scala.collection.JavaConverters._
 
@@ -1363,14 +1364,16 @@ object TaskManager {
   @throws(classOf[IllegalConfigurationException])
   @throws(classOf[IOException])
   @throws(classOf[Exception])
-  def startTaskManagerComponentsAndActor(configuration: Configuration,
-                                         actorSystem: ActorSystem,
-                                         taskManagerHostname: String,
-                                         taskManagerActorName: Option[String],
-                                         jobManagerPath: Option[String],
-                                         localTaskManagerCommunication: 
Boolean,
-                                         streamingMode: StreamingMode,
-                                         taskManagerClass: Class[_ <: 
TaskManager]): ActorRef = {
+  def startTaskManagerComponentsAndActor(
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      taskManagerHostname: String,
+      taskManagerActorName: Option[String],
+      jobManagerPath: Option[String],
+      localTaskManagerCommunication: Boolean,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : ActorRef = {
 
     // get and check the JobManager config
     val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
@@ -1380,17 +1383,20 @@ object TaskManager {
     }
 
     val (taskManagerConfig : TaskManagerConfiguration,
-         netConfig: NetworkEnvironmentConfiguration,
-         connectionInfo: InstanceConnectionInfo)
-
-         = parseTaskManagerConfiguration(configuration, taskManagerHostname,
-                                         localTaskManagerCommunication)
+      netConfig: NetworkEnvironmentConfiguration,
+      connectionInfo: InstanceConnectionInfo
+    ) = parseTaskManagerConfiguration(
+      configuration,
+      taskManagerHostname,
+      localTaskManagerCommunication)
 
     // pre-start checks
     checkTempDirs(taskManagerConfig.tmpDirPaths)
 
+    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
     // we start the network first, to make sure it can allocate its buffers 
first
-    val network = new NetworkEnvironment(taskManagerConfig.timeout, netConfig)
+    val network = new NetworkEnvironment(executionContext, 
taskManagerConfig.timeout, netConfig)
 
     // computing the amount of memory to use depends on how much memory is 
available
     // it strictly needs to happen AFTER the network stack has been initialized

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 693e014..1e66d81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -43,6 +44,7 @@ public class AllVerticesIteratorTest {
                        v4.setParallelism(2);
                        
                        ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+                       
Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
                                        
                        ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, 
v1, 1,
                                        AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index a4bd03c..a47ea77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -99,7 +100,12 @@ public class ExecutionGraphConstructionTest {
                
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -137,7 +143,12 @@ public class ExecutionGraphConstructionTest {
                
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -198,7 +209,12 @@ public class ExecutionGraphConstructionTest {
                
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -446,7 +462,12 @@ public class ExecutionGraphConstructionTest {
                
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -496,7 +517,12 @@ public class ExecutionGraphConstructionTest {
                
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -551,7 +577,11 @@ public class ExecutionGraphConstructionTest {
                        
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-                       ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobId,
+                                       jobName,
+                                       cfg,
                                        AkkaUtils.getDefaultTimeout());
                        try {
                                eg.attachJobGraph(ordered);
@@ -591,7 +621,11 @@ public class ExecutionGraphConstructionTest {
                        
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-                       ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobId,
+                                       jobName,
+                                       cfg,
                                        AkkaUtils.getDefaultTimeout());
 
                        try {
@@ -657,7 +691,11 @@ public class ExecutionGraphConstructionTest {
                        
                        JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jobId, jobName, 
cfg,
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobId,
+                                       jobName,
+                                       cfg,
                                        AkkaUtils.getDefaultTimeout());
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 03a41b4..cff7146 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,12 +30,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -51,29 +45,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionGraphDeploymentTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup() {
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-
        @Test
        public void testBuildDeploymentDescriptor() {
                try {
-                       TestingUtils.setCallingThreadDispatcher(system);
                        final JobID jobId = new JobID();
 
                        final JobVertexID jid1 = new JobVertexID();
@@ -100,7 +78,11 @@ public class ExecutionGraphDeploymentTest {
                        v3.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
                        v4.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
 
-                       ExecutionGraph eg = new ExecutionGraph(jobId, "some 
job", new Configuration(),
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobId,
+                                       "some job",
+                                       new Configuration(),
                                        AkkaUtils.getDefaultTimeout());
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
@@ -110,15 +92,9 @@ public class ExecutionGraphDeploymentTest {
                        ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
                        ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
-                       // create synchronous task manager
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       Props.create(ExecutionGraphTestUtils
-                                                       
.SimpleAcknowledgingTaskManager.class));
-
-                       ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager 
tm = (ExecutionGraphTestUtils
-                                       .SimpleAcknowledgingTaskManager) 
simpleTaskManager.underlyingActor();
+                       ExecutionGraphTestUtils.SimpleInstanceGateway 
instanceGateway = new 
ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
 
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(instanceGateway);
 
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(jobId);
 
@@ -128,7 +104,7 @@ public class ExecutionGraphDeploymentTest {
 
                        assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
 
-                       TaskDeploymentDescriptor descr = tm.lastTDD;
+                       TaskDeploymentDescriptor descr = 
instanceGateway.lastTDD;
                        assertNotNull(descr);
 
                        assertEquals(jobId, descr.getJobID());
@@ -152,9 +128,6 @@ public class ExecutionGraphDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 
        @Test
@@ -307,19 +280,23 @@ public class ExecutionGraphDeploymentTest {
                v2.setInvokableClass(RegularPactTask.class);
 
                // execution graph that executes actions synchronously
-               ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new 
Configuration(),
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.directExecutionContext(),
+                               jobId,
+                               "some job",
+                               new Configuration(),
                                AkkaUtils.getDefaultTimeout());
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
                eg.attachJobGraph(ordered);
 
-               // create a mock taskmanager that accepts deployment calls
-               ActorRef tm = 
system.actorOf(Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-
-               Scheduler scheduler = new Scheduler();
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                for (int i = 0; i < dop1 + dop2; i++) {
-                       
scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(tm));
+                       scheduler.newInstanceAvailable(
+                                       ExecutionGraphTestUtils.getInstance(
+                                                       new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                                                                       
TestingUtils.directExecutionContext())));
                }
                assertEquals(dop1 + dop2, 
scheduler.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a77a09e..8a63060 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -26,17 +26,16 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.LinkedList;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -49,9 +48,11 @@ import 
org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import scala.concurrent.ExecutionContext;
 
 public class ExecutionGraphTestUtils {
 
@@ -100,103 +101,95 @@ public class ExecutionGraphTestUtils {
        //  utility mocking methods
        // 
--------------------------------------------------------------------------------------------
 
-       public static Instance getInstance(final ActorRef taskManager) throws
-                       Exception {
-               return getInstance(taskManager, 1);
+       public static Instance getInstance(final InstanceGateway gateway) 
throws Exception {
+               return getInstance(gateway, 1);
        }
 
-       public static Instance getInstance(final ActorRef taskManager, final 
int numberOfSlots) throws Exception {
+       public static Instance getInstance(final InstanceGateway gateway, final 
int numberOfSlots) throws Exception {
                HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
-               
-               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
+
+               return new Instance(gateway, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
        }
 
-       public static class SimpleAcknowledgingTaskManager extends UntypedActor 
{
+       public static class SimpleInstanceGateway extends 
BaseTestingInstanceGateway {
                public TaskDeploymentDescriptor lastTDD;
+
+               public SimpleInstanceGateway(ExecutionContext executionContext){
+                       super(executionContext);
+               }
+
                @Override
-               public void onReceive(Object msg) throws Exception {
-                       if (msg instanceof SubmitTask) {
-                               SubmitTask submitTask = (SubmitTask) msg;
+               public Object handleMessage(Object message) {
+                       Object result = null;
+                       if(message instanceof SubmitTask) {
+                               SubmitTask submitTask = (SubmitTask) message;
                                lastTDD = submitTask.tasks();
 
-                               getSender().tell(Messages.getAcknowledge(), 
getSelf());
-                       } else if (msg instanceof CancelTask) {
-                               CancelTask cancelTask = (CancelTask) msg;
-                               getSender().tell(new 
TaskOperationResult(cancelTask.attemptID(), true), getSelf());
-                       }
-                       else if (msg instanceof 
FailIntermediateResultPartitions) {
-                               getSender().tell(new Object(), getSelf());
+                               result = Messages.getAcknowledge();
+                       } else if(message instanceof CancelTask) {
+                               CancelTask cancelTask = (CancelTask) message;
+
+                               result = new 
TaskOperationResult(cancelTask.attemptID(), true);
+                       } else if(message instanceof 
FailIntermediateResultPartitions) {
+                               result = new Object();
                        }
+
+                       return result;
                }
        }
 
-       public static final String ERROR_MESSAGE = "test_failure_error_message";
+       public static class SimpleFailingInstanceGateway extends 
BaseTestingInstanceGateway {
+               public SimpleFailingInstanceGateway(ExecutionContext 
executionContext) {
+                       super(executionContext);
+               }
 
-       public static class SimpleFailingTaskManager extends UntypedActor {
                @Override
-               public void onReceive(Object msg) throws Exception {
-                       if (msg instanceof SubmitTask) {
-                               getSender().tell(new Status.Failure(new 
Exception(ERROR_MESSAGE)),      getSelf());
-                       } else if (msg instanceof CancelTask) {
-                               CancelTask cancelTask = (CancelTask) msg;
-                               getSender().tell(new 
TaskOperationResult(cancelTask.attemptID(), true), getSelf());
+               public Object handleMessage(Object message) throws Exception {
+                       if(message instanceof SubmitTask) {
+                               throw new Exception(ERROR_MESSAGE);
+                       } else if (message instanceof CancelTask) {
+                               CancelTask cancelTask = (CancelTask) message;
+
+                               return new 
TaskOperationResult(cancelTask.attemptID(), true);
+                       } else {
+                               return null;
                        }
                }
        }
-       
-       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws JobException {
+
+       public static final String ERROR_MESSAGE = "test_failure_error_message";
+
+       public static ExecutionJobVertex getExecutionVertex(JobVertexID id, 
ExecutionContext executionContext) throws JobException {
                JobVertex ajv = new JobVertex("TestVertex", id);
                ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-               
-               ExecutionGraph graph = new ExecutionGraph(new JobID(), "test 
job", new Configuration(),
+
+               ExecutionGraph graph = new ExecutionGraph(
+                               executionContext,
+                               new JobID(),
+                               "test job",
+                               new Configuration(),
                                AkkaUtils.getDefaultTimeout());
-               
+
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
                                AkkaUtils.getDefaultTimeout()));
-               
+
                Answer<Void> noop = new Answer<Void>() {
                        @Override
                        public Void answer(InvocationOnMock invocation) {
                                return null;
                        }
                };
-               
+
                doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
                doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), 
Matchers.any(Throwable.class));
                doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-               
+
                return ejv;
        }
        
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class ActionQueue {
-               
-               private final LinkedList<Runnable> runnables = new 
LinkedList<Runnable>();
-               
-               public void triggerNextAction() {
-                       Runnable r = runnables.remove();
-                       r.run();
-               }
-
-               public void triggerLatestAction(){
-                       Runnable r = runnables.removeLast();
-                       r.run();
-               }
-               
-               public Runnable popNextAction() {
-                       Runnable r = runnables.remove();
-                       return r;
-               }
-
-               public void queueAction(Runnable r) {
-                       this.runnables.add(r);
-               }
-
-               public boolean isEmpty(){
-                       return runnables.isEmpty();
-               }
+       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws JobException {
+               return getExecutionVertex(id, 
TestingUtils.defaultExecutionContext());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 7787ab4..f47e92c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -24,10 +24,6 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,24 +33,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionStateProgressTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown(){
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
        @Test
        public void testAccumulatedStateFinished() {
                try {
@@ -65,8 +47,13 @@ public class ExecutionStateProgressTest {
                        ajv.setParallelism(3);
                        
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
-                       ExecutionGraph graph = new ExecutionGraph(jid, "test 
job", new Configuration(),
+                       ExecutionGraph graph = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jid,
+                                       "test job",
+                                       new Configuration(),
                                        AkkaUtils.getDefaultTimeout());
+
                        graph.attachJobGraph(Arrays.asList(ajv));
 
                        setGraphStatus(graph, JobStatus.RUNNING);
@@ -74,9 +61,11 @@ public class ExecutionStateProgressTest {
                        ExecutionJobVertex ejv = graph.getJobVertex(vid);
 
                        // mock resources and mock taskmanager
-                       ActorRef taskManager = 
system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
                        for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               SimpleSlot slot = 
getInstance(taskManager).allocateSimpleSlot(jid);
+                               SimpleSlot slot = getInstance(
+                                               new SimpleInstanceGateway(
+                                                               
TestingUtils.defaultExecutionContext())
+                               ).allocateSimpleSlot(jid);
                                ee.deployToSlot(slot);
                        }
 

Reply via email to