This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 228a3da  [FLINK-11593][tests] Port TaskManagerTest to new code base
228a3da is described below

commit 228a3da1b974640c477d61b5390805b54e9610f9
Author: Clark Yang <yangshi...@youzan.com>
AuthorDate: Wed Apr 10 18:09:50 2019 +0800

    [FLINK-11593][tests] Port TaskManagerTest to new code base
---
 .../jobmaster/TestingAbstractInvokables.java       |   36 +
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  951 +++++++++
 .../runtime/taskexecutor/TaskExecutorTest.java     |   44 +
 .../TaskManagerServicesConfigurationTest.java      |   29 +
 .../TaskSubmissionTestEnvironment.java             |  363 ++++
 .../taskexecutor/TestTaskManagerActions.java       |  113 +
 .../flink/runtime/taskmanager/TaskManagerTest.java | 2185 --------------------
 .../flink/runtime/testingUtils/TestingUtils.scala  |  186 --
 8 files changed, 1536 insertions(+), 2371 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
index 4e8d7eb..e97dec7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.types.IntValue;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * {@link AbstractInvokable} for testing purposes.
  */
@@ -83,4 +85,38 @@ public class TestingAbstractInvokables {
                        }
                }
        }
+
+       public static final class TestInvokableRecordCancel extends 
AbstractInvokable {
+
+               private static CompletableFuture<Boolean> gotCanceledFuture = 
new CompletableFuture<>();
+
+               public TestInvokableRecordCancel(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       final Object o = new Object();
+                       RecordWriter<IntValue> recordWriter = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
+                       for (int i = 0; i < 1024; i++) {
+                               recordWriter.emit(new IntValue(42));
+                       }
+
+                       gotCanceledFuture.get();
+
+               }
+
+               @Override
+               public void cancel() {
+                       gotCanceledFuture.complete(true);
+               }
+
+               public static void resetGotCanceledFuture() {
+                       gotCanceledFuture = new CompletableFuture<>();
+               }
+
+               public static CompletableFuture<Boolean> gotCanceled() {
+                       return gotCanceledFuture;
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
new file mode 100644
index 0000000..3bb8354
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for submission logic of the {@link TaskExecutor}.
+ */
+public class TaskExecutorSubmissionTest extends TestLogger {
+
+       @Rule
+       public final TestName testName = new TestName();
+
+       private static final Time timeout = Time.milliseconds(10000L);
+
+       private JobID jobId = new JobID();
+
+       /**
+        * Tests that we can submit a task to the TaskManager given that we've 
allocated a slot there.
+        */
+       @Test(timeout = 10000L)
+       public void testTaskSubmission() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor("test task", eid, 
TaskExecutorTest.TestInvokable.class);
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+
+                       taskRunningFuture.get();
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the submit task
+        * message fails.
+        */
+       @Test(timeout = 10000L)
+       public void testSubmitTaskFailure() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor(
+                       "test task",
+                       eid,
+                       BlockingNoOpInvokable.class,
+                       0); // this will make the submission fail because the 
number of key groups must be >= 1
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+               } catch (Exception e) {
+                       assertThat(e.getCause(), 
instanceOf(IllegalArgumentException.class));
+               }
+       }
+
+       /**
+        * Tests that we can cancel the task of the TaskManager given that 
we've submitted it.
+        */
+       @Test(timeout = 10000L)
+       public void testTaskSubmissionAndCancelling() throws Exception {
+               final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+               final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd1 = 
createTestTaskDeploymentDescriptor("test task", eid1, 
BlockingNoOpInvokable.class);
+               final TaskDeploymentDescriptor tdd2 = 
createTestTaskDeploymentDescriptor("test task", eid2, 
BlockingNoOpInvokable.class);
+
+               final CompletableFuture<Void> task1RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task1CanceledFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(2)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.RUNNING, task1RunningFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.RUNNING, task2RunningFuture)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.CANCELED, task1CanceledFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd1.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd1, env.getJobMasterId(), 
timeout).get();
+                       task1RunningFuture.get();
+
+                       taskSlotTable.allocateSlot(1, jobId, 
tdd2.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd2, env.getJobMasterId(), 
timeout).get();
+                       task2RunningFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.RUNNING);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.RUNNING);
+
+                       tmGateway.cancelTask(eid1, timeout);
+                       task1CanceledFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.CANCELED);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.RUNNING);
+               }
+       }
+
+       /**
+        * Tests that we can stop the task of the TaskManager given that we've 
submitted it.
+        */
+       @Test(timeout = 10000L)
+       public void testTaskSubmissionAndStop() throws Exception {
+               final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+               final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd1 = 
createTestTaskDeploymentDescriptor("test task", eid1, StoppableInvokable.class);
+               final TaskDeploymentDescriptor tdd2 = 
createTestTaskDeploymentDescriptor("test task", eid2, 
BlockingNoOpInvokable.class);
+
+               final CompletableFuture<Void> task1RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task1FinishedFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(2)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.RUNNING, task1RunningFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.RUNNING, task2RunningFuture)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.FINISHED, task1FinishedFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd1.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd1, env.getJobMasterId(), 
timeout).get();
+                       task1RunningFuture.get();
+
+                       taskSlotTable.allocateSlot(1, jobId, 
tdd2.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd2, env.getJobMasterId(), 
timeout).get();
+                       task2RunningFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.RUNNING);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.RUNNING);
+
+                       tmGateway.stopTask(eid1, timeout);
+                       task1FinishedFuture.get();
+
+                       // task 2 does not implement StoppableTask which should 
cause the stop operation to fail
+                       CompletableFuture<Acknowledge> acknowledgeOfTask2 =     
tmGateway.stopTask(eid2, timeout);
+                       boolean hasTaskException = false;
+                       try {
+                               acknowledgeOfTask2.get();
+                       } catch (Throwable e) {
+                               hasTaskException = 
ExceptionUtils.findThrowable(e, TaskException.class).isPresent();
+                       }
+
+                       assertTrue(hasTaskException);
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.FINISHED);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.RUNNING);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the stop task
+        * message fails.
+        */
+       @Test(timeout = 10000L)
+       public void testStopTaskFailure() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor("test task", eid, 
BlockingNoOpInvokable.class);
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+                       taskRunningFuture.get();
+
+                       CompletableFuture<Acknowledge> stopFuture = 
tmGateway.stopTask(eid, timeout);
+                       try {
+                               stopFuture.get();
+                       } catch (Exception e) {
+                               assertTrue(e.getCause() instanceof 
TaskException);
+                               assertThat(e.getCause().getMessage(), 
startsWith("Cannot stop task for execution"));
+                       }
+               }
+       }
+
+       /**
+        * Tests that submitted tasks will fail when attempting to send/receive 
data if no
+        * ResultPartitions/InputGates are set up.
+        */
+       @Test(timeout = 10000L)
+       public void testGateChannelEdgeMismatch() throws Exception {
+               final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+               final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd1 =
+                       createTestTaskDeploymentDescriptor("Sender", eid1, 
TestingAbstractInvokables.Sender.class);
+               final TaskDeploymentDescriptor tdd2 =
+                       createTestTaskDeploymentDescriptor("Receiver", eid2, 
TestingAbstractInvokables.Receiver.class);
+
+               final CompletableFuture<Void> task1RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task1FailedFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2FailedFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.RUNNING, task1RunningFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.RUNNING, task2RunningFuture)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.FAILED, task1FailedFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.FAILED, task2FailedFuture)
+                               .setSlotSize(2)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd1.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd1, env.getJobMasterId(), 
timeout).get();
+                       task1RunningFuture.get();
+
+                       taskSlotTable.allocateSlot(1, jobId, 
tdd2.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd2, env.getJobMasterId(), 
timeout).get();
+                       task2RunningFuture.get();
+
+                       task1FailedFuture.get();
+                       task2FailedFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.FAILED);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.FAILED);
+               }
+       }
+
+       @Test(timeout = 10000L)
+       public void testRunJobWithForwardChannel() throws Exception {
+               final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+               final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+               IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
+
+               ResultPartitionDeploymentDescriptor 
task1ResultPartitionDescriptor =
+                       new ResultPartitionDeploymentDescriptor(new 
IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1,
+                               1, true);
+
+               InputGateDeploymentDescriptor task2InputGateDescriptor =
+                       new InputGateDeploymentDescriptor(new 
IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0,
+                               new InputChannelDeploymentDescriptor[] {
+                                       new 
InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1),
+                                               
ResultPartitionLocation.createLocal()) });
+
+               final TaskDeploymentDescriptor tdd1 =
+                       createTestTaskDeploymentDescriptor(
+                               "Sender",
+                               eid1,
+                               TestingAbstractInvokables.Sender.class, 
+                               1,
+                               
Collections.singletonList(task1ResultPartitionDescriptor),
+                               Collections.emptyList());
+               final TaskDeploymentDescriptor tdd2 =
+                       createTestTaskDeploymentDescriptor(
+                               "Receiver",
+                               eid2,
+                               TestingAbstractInvokables.Receiver.class,
+                               1,
+                               Collections.emptyList(),
+                               
Collections.singletonList(task2InputGateDescriptor));
+
+               final CompletableFuture<Void> task1RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task1FinishedFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2FinishedFuture = new 
CompletableFuture<>();
+
+               final JobMasterId jobMasterId = JobMasterId.generate();
+               TestingJobMasterGateway testingJobMasterGateway =
+                       new TestingJobMasterGatewayBuilder()
+                       .setFencingTokenSupplier(() -> jobMasterId)
+                       .setScheduleOrUpdateConsumersFunction(
+                               resultPartitionID -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                       .build();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(2)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.RUNNING, task1RunningFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.RUNNING, task2RunningFuture)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.FINISHED, task1FinishedFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.FINISHED, task2FinishedFuture)
+                               .setJobMasterId(jobMasterId)
+                               .setJobMasterGateway(testingJobMasterGateway)
+                               .setMockNetworkEnvironment(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd1.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
+                       task1RunningFuture.get();
+
+                       taskSlotTable.allocateSlot(1, jobId, 
tdd2.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
+                       task2RunningFuture.get();
+
+                       task1FinishedFuture.get();
+                       task2FinishedFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.FINISHED);
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.FINISHED);
+               }
+       }
+
+       /**
+        * This tests creates two tasks. The sender sends data but fails to 
send the
+        * state update back to the job manager.
+        * the second one blocks to be canceled
+        */
+       @Test(timeout = 10000L)
+       public void testCancellingDependentAndStateUpdateFails() throws 
Exception {
+               final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+               final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+
+               IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
+
+               ResultPartitionDeploymentDescriptor 
task1ResultPartitionDescriptor =
+                       new ResultPartitionDeploymentDescriptor(new 
IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1,
+                               1, true);
+
+               InputGateDeploymentDescriptor task2InputGateDescriptor =
+                       new InputGateDeploymentDescriptor(new 
IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0,
+                               new InputChannelDeploymentDescriptor[] {
+                                       new 
InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1),
+                                               
ResultPartitionLocation.createLocal()) });
+
+               final TaskDeploymentDescriptor tdd1 =
+                       createTestTaskDeploymentDescriptor("Sender",
+                               eid1,
+                               TestingAbstractInvokables.Sender.class, 1,
+                               
Collections.singletonList(task1ResultPartitionDescriptor),
+                               Collections.emptyList());
+               final TaskDeploymentDescriptor tdd2 =
+                       createTestTaskDeploymentDescriptor("Receiver",
+                               eid2,
+                               TestingAbstractInvokables.Receiver.class,
+                               1,
+                               Collections.emptyList(),
+                               
Collections.singletonList(task2InputGateDescriptor));
+
+               final CompletableFuture<Void> task1RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2RunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task1FailedFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> task2CanceledFuture = new 
CompletableFuture<>();
+
+               final JobMasterId jobMasterId = JobMasterId.generate();
+               TestingJobMasterGateway testingJobMasterGateway =
+                       new TestingJobMasterGatewayBuilder()
+                       .setFencingTokenSupplier(() -> jobMasterId)
+                       .setUpdateTaskExecutionStateFunction(taskExecutionState 
-> {
+                               if (taskExecutionState != null && 
taskExecutionState.getID().equals(eid1)) {
+                                       return 
FutureUtils.completedExceptionally(
+                                               new 
ExecutionGraphException("The execution attempt " + eid2 + " was not found."));
+                               } else {
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               }
+                       })
+                       .build();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(2)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.RUNNING, task1RunningFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.RUNNING, task2RunningFuture)
+                               .addTaskManagerActionListener(eid1, 
ExecutionState.FAILED, task1FailedFuture)
+                               .addTaskManagerActionListener(eid2, 
ExecutionState.CANCELED, task2CanceledFuture)
+                               .setJobMasterId(jobMasterId)
+                               .setJobMasterGateway(testingJobMasterGateway)
+                               .setMockNetworkEnvironment(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd1.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
+                       task1RunningFuture.get();
+
+                       taskSlotTable.allocateSlot(1, jobId, 
tdd2.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
+                       task2RunningFuture.get();
+
+                       task1FailedFuture.get();
+                       
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), 
ExecutionState.FAILED);
+
+                       tmGateway.cancelTask(eid2, timeout);
+
+                       task2CanceledFuture.get();
+                       
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), 
ExecutionState.CANCELED);
+               }
+       }
+
+       /**
+        * Tests that repeated remote {@link PartitionNotFoundException}s 
ultimately fail the receiver.
+        */
+       @Test(timeout = 10000L)
+       public void testRemotePartitionNotFound() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+               final ResultPartitionID partitionId = new ResultPartitionID();
+
+               final int dataPort = NetUtils.getAvailablePort();
+               Configuration config = new Configuration();
+               config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+
+               // Remote location (on the same TM though) for the partition
+               final ResultPartitionLocation loc = ResultPartitionLocation
+                       .createRemote(new ConnectionID(
+                               new InetSocketAddress("localhost", dataPort), 
0));
+
+               final InputChannelDeploymentDescriptor[] 
inputChannelDeploymentDescriptors =
+                       new InputChannelDeploymentDescriptor[] {
+                               new 
InputChannelDeploymentDescriptor(partitionId, loc)};
+
+               final InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor =
+                       new InputGateDeploymentDescriptor(resultId, 
ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors);
+
+               final TaskDeploymentDescriptor tdd =
+                       createTestTaskDeploymentDescriptor("Receiver",
+                               eid,
+                               Tasks.AgnosticReceiver.class, 1,
+                               Collections.emptyList(),
+                               
Collections.singletonList(inputGateDeploymentDescriptor));
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> taskFailedFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(2)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.FAILED, taskFailedFuture)
+                               .setConfiguration(config)
+                               .setLocalCommunication(false)
+                               .setMockNetworkEnvironment(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+                       taskRunningFuture.get();
+
+                       taskFailedFuture.get();
+                       
assertThat(taskSlotTable.getTask(eid).getFailureCause(), 
instanceOf(PartitionNotFoundException.class));
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends proper exception back to the sender 
if the partition update fails.
+        */
+       @Test
+       public void testUpdateTaskInputPartitionsFailure() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor("test task", eid, 
BlockingNoOpInvokable.class);
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+                       taskRunningFuture.get();
+
+                       CompletableFuture<Acknowledge> updateFuture = 
tmGateway.updatePartitions(
+                               eid,
+                               Collections.singletonList(
+                                       new PartitionInfo(
+                                               new IntermediateDataSetID(),
+                                               new 
InputChannelDeploymentDescriptor(new ResultPartitionID(), 
ResultPartitionLocation.createLocal()))),
+                               timeout);
+                       try {
+                               updateFuture.get();
+                               fail();
+                       } catch (Exception e) {
+                               assertTrue(ExceptionUtils.findThrowable(e, 
PartitionException.class).isPresent());
+                       }
+               }
+       }
+
+       /**
+        *  Tests that repeated local {@link PartitionNotFoundException}s 
ultimately fail the receiver.
+        */
+       @Test(timeout = 10000L)
+       public void testLocalPartitionNotFound() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+               final ResultPartitionID partitionId = new ResultPartitionID();
+
+               final ResultPartitionLocation loc = 
ResultPartitionLocation.createLocal();
+
+               final InputChannelDeploymentDescriptor[] 
inputChannelDeploymentDescriptors =
+                       new InputChannelDeploymentDescriptor[] {
+                               new 
InputChannelDeploymentDescriptor(partitionId, loc)};
+
+               final InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor =
+                       new InputGateDeploymentDescriptor(resultId, 
ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors);
+
+               final TaskDeploymentDescriptor tdd =
+                       createTestTaskDeploymentDescriptor("Receiver",
+                               eid,
+                               Tasks.AgnosticReceiver.class,
+                               1, Collections.emptyList(),
+                               
Collections.singletonList(inputGateDeploymentDescriptor));
+
+               Configuration config = new Configuration();
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> taskFailedFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.FAILED, taskFailedFuture)
+                               .setConfiguration(config)
+                               .setMockNetworkEnvironment(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+                       taskRunningFuture.get();
+
+                       taskFailedFuture.get();
+
+                       
assertSame(taskSlotTable.getTask(eid).getExecutionState(), 
ExecutionState.FAILED);
+                       
assertThat(taskSlotTable.getTask(eid).getFailureCause(), 
instanceOf(PartitionNotFoundException.class));
+               }
+       }
+
+       /**
+        * Test that a failing schedule or update consumers call leads to the 
failing of the respective
+        * task.
+        *
+        * <p>IMPORTANT: We have to make sure that the invokable's cancel 
method is called, because only
+        * then the future is completed. We do this by not eagerly deploying 
consumer tasks and requiring
+        * the invokable to fill one memory segment. The completed memory 
segment will trigger the
+        * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
+        * memory segment, we'll block the invokable and wait for the task 
failure due to the failed
+        * schedule or update consumers call.
+        */
+       @Test(timeout = 10000L)
+       public void testFailingScheduleOrUpdateConsumers() throws Exception {
+               final Configuration configuration = new Configuration();
+
+               // set the memory segment to the smallest size possible, 
because we have to fill one
+               // memory buffer to trigger the schedule or update consumers 
message to the downstream
+               // operators
+               configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
"4096");
+
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+               final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
+                       new IntermediateDataSetID(),
+                       new IntermediateResultPartitionID(),
+                       ResultPartitionType.PIPELINED,
+                       1,
+                       1,
+                       true);
+
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor(
+                       "test task",
+                       eid,
+                       
TestingAbstractInvokables.TestInvokableRecordCancel.class,
+                       1,
+                       
Collections.singletonList(resultPartitionDeploymentDescriptor),
+                       Collections.emptyList());
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+
+               final Exception exception = new Exception("Failed schedule or 
update consumers");
+
+               final JobMasterId jobMasterId = JobMasterId.generate();
+               TestingJobMasterGateway testingJobMasterGateway =
+                       new TestingJobMasterGatewayBuilder()
+                               .setFencingTokenSupplier(() -> jobMasterId)
+                               
.setUpdateTaskExecutionStateFunction(resultPartitionID -> 
FutureUtils.completedExceptionally(exception))
+                               .build();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .setConfiguration(configuration)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .setJobMasterId(jobMasterId)
+                               .setJobMasterGateway(testingJobMasterGateway)
+                               .setMockNetworkEnvironment(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       
TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, jobMasterId, timeout).get();
+                       taskRunningFuture.get();
+
+                       CompletableFuture<Boolean> cancelFuture = 
TestingAbstractInvokables.TestInvokableRecordCancel.gotCanceled();
+
+                       assertTrue(cancelFuture.get());
+                       
assertTrue(ExceptionUtils.findThrowableWithMessage(taskSlotTable.getTask(eid).getFailureCause(),
 exception.getMessage()).isPresent());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Stack trace sample
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Tests sampling of task stack traces.
+        */
+       @Test(timeout = 10000L)
+       @SuppressWarnings("unchecked")
+       public void testRequestStackTraceSample() throws Exception {
+               final ExecutionAttemptID eid = new ExecutionAttemptID();
+               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor("test task", eid, 
BlockingNoOpInvokable.class);
+
+               final int sampleId1 = 112223;
+               final int sampleId2 = 19230;
+               final int sampleId3 = 1337;
+               final int sampleId4 = 44;
+
+               final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
+               final CompletableFuture<Void> taskCanceledFuture = new 
CompletableFuture<>();
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setSlotSize(1)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
+                               .addTaskManagerActionListener(eid, 
ExecutionState.CANCELED, taskCanceledFuture)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+
+                       taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
+                       tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
+                       taskRunningFuture.get();
+
+                       //
+                       // 1) Trigger sample for non-existing task
+                       //
+                       ExecutionAttemptID nonExistTaskEid = new 
ExecutionAttemptID();
+
+                       CompletableFuture<StackTraceSampleResponse> 
failedSampleFuture =
+                               
tmGateway.requestStackTraceSample(nonExistTaskEid, sampleId1, 100, 
Time.seconds(60L), 0, timeout);
+                       try {
+                               failedSampleFuture.get();
+                       } catch (Exception e) {
+                               assertThat(e.getCause(), 
instanceOf(IllegalStateException.class));
+                               assertThat(e.getCause().getMessage(), 
startsWith("Cannot sample task"));
+                       }
+
+                       //
+                       // 2) Trigger sample for the blocking task
+                       //
+                       int numSamples = 5;
+
+                       CompletableFuture<StackTraceSampleResponse> 
successfulSampleFuture =
+                               tmGateway.requestStackTraceSample(eid, 
sampleId2, numSamples, Time.milliseconds(100L), 0, timeout);
+
+                       StackTraceSampleResponse response = 
successfulSampleFuture.get();
+
+                       assertEquals(response.getSampleId(), sampleId2);
+                       assertEquals(response.getExecutionAttemptID(), eid);
+
+                       List<StackTraceElement[]> traces = 
response.getSamples();
+
+                       assertEquals("Number of samples", numSamples, 
traces.size());
+
+                       for (StackTraceElement[] trace : traces) {
+                               boolean success = false;
+                               for (StackTraceElement elem : trace) {
+                                       // Look for BlockingNoOpInvokable#invoke
+                                       if (elem.getClassName().equals(
+                                               
BlockingNoOpInvokable.class.getName())) {
+
+                                               assertEquals("invoke", 
elem.getMethodName());
+
+                                               success = true;
+                                               break;
+                                       }
+                                       // The BlockingNoOpInvokable might not 
be invoked here
+                                       if 
(elem.getClassName().equals(TestTaskManagerActions.class.getName())) {
+
+                                               
assertEquals("updateTaskExecutionState", elem.getMethodName());
+
+                                               success = true;
+                                               break;
+                                       }
+                                       if 
(elem.getClassName().equals(Thread.class) && 
elem.getMethodName().equals("setContextClassLoader")) {
+                                               success = true;
+                                       }
+                               }
+
+                               assertTrue("Unexpected stack trace: " +
+                                       Arrays.toString(trace), success);
+                       }
+
+                       //
+                       // 3) Trigger sample for the blocking task with max 
depth
+                       //
+                       int maxDepth = 2;
+
+                       CompletableFuture<StackTraceSampleResponse> 
successfulSampleFutureWithMaxDepth =
+                               tmGateway.requestStackTraceSample(eid, 
sampleId3, numSamples, Time.milliseconds(100L), maxDepth, timeout);
+
+                       StackTraceSampleResponse responseWithMaxDepth = 
successfulSampleFutureWithMaxDepth.get();
+
+                       assertEquals(sampleId3, 
responseWithMaxDepth.getSampleId());
+                       assertEquals(eid, 
responseWithMaxDepth.getExecutionAttemptID());
+
+                       List<StackTraceElement[]> tracesWithMaxDepth = 
responseWithMaxDepth.getSamples();
+
+                       assertEquals("Number of samples", numSamples, 
tracesWithMaxDepth.size());
+
+                       for (StackTraceElement[] trace : tracesWithMaxDepth) {
+                               assertEquals("Max depth", maxDepth, 
trace.length);
+                       }
+
+                       //
+                       // 4) Trigger sample for the blocking task, but cancel 
it during sampling
+                       //
+                       int sleepTime = 100;
+                       numSamples = 100;
+
+                       CompletableFuture<StackTraceSampleResponse> 
canceldSampleFuture =
+                               tmGateway.requestStackTraceSample(eid, 
sampleId4, numSamples, Time.milliseconds(10L), maxDepth, timeout);
+
+                       Thread.sleep(sleepTime);
+
+                       tmGateway.cancelTask(eid, timeout);
+                       taskCanceledFuture.get();
+
+                       StackTraceSampleResponse responseAfterCancel = 
canceldSampleFuture.get();
+
+                       assertEquals(eid, 
responseAfterCancel.getExecutionAttemptID());
+                       assertEquals(sampleId4, 
responseAfterCancel.getSampleId());
+               }
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName, eid, 
abstractInvokable, 1);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable,
+               int maxNumberOfSubtasks
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName,
+                       eid,
+                       abstractInvokable,
+                       maxNumberOfSubtasks,
+                       Collections.emptyList(),
+                       Collections.emptyList());
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable,
+               int maxNumberOfSubtasks,
+               Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+               Collection<InputGateDeploymentDescriptor> inputGates
+       ) throws IOException {
+               Preconditions.checkNotNull(producedPartitions);
+               Preconditions.checkNotNull(inputGates);
+               return createTaskDeploymentDescriptor(
+                       jobId, testName.getMethodName(), eid,
+                       new SerializedValue<>(new ExecutionConfig()), taskName, 
maxNumberOfSubtasks, 0, 1, 0,
+                       new Configuration(), new Configuration(), 
abstractInvokable.getName(),
+                       producedPartitions,
+                       inputGates,
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       0);
+       }
+
+       private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+                       JobID jobId,
+                       String jobName,
+                       ExecutionAttemptID executionAttemptId,
+                       SerializedValue<ExecutionConfig> 
serializedExecutionConfig,
+                       String taskName,
+                       int maxNumberOfSubtasks,
+                       int subtaskIndex,
+                       int numberOfSubtasks,
+                       int attemptNumber,
+                       Configuration jobConfiguration,
+                       Configuration taskConfiguration,
+                       String invokableClassName,
+                       Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+                       Collection<InputGateDeploymentDescriptor> inputGates,
+                       Collection<PermanentBlobKey> requiredJarFiles,
+                       Collection<URL> requiredClasspaths,
+                       int targetSlotNumber) throws IOException {
+
+               JobInformation jobInformation = new JobInformation(
+                       jobId,
+                       jobName,
+                       serializedExecutionConfig,
+                       jobConfiguration,
+                       requiredJarFiles,
+                       requiredClasspaths);
+
+               TaskInformation taskInformation = new TaskInformation(
+                       new JobVertexID(),
+                       taskName,
+                       numberOfSubtasks,
+                       maxNumberOfSubtasks,
+                       invokableClassName,
+                       taskConfiguration);
+
+               SerializedValue<JobInformation> serializedJobInformation = new 
SerializedValue<>(jobInformation);
+               SerializedValue<TaskInformation> serializedJobVertexInformation 
= new SerializedValue<>(taskInformation);
+
+               return new TaskDeploymentDescriptor(
+                       jobId,
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
+                       executionAttemptId,
+                       new AllocationID(),
+                       subtaskIndex,
+                       attemptNumber,
+                       targetSlotNumber,
+                       null,
+                       producedPartitions,
+                       inputGates);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6087cf1..c7ae450 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -97,6 +99,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
@@ -134,11 +137,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -1754,6 +1759,45 @@ public class TaskExecutorTest extends TestLogger {
                }
        }
 
+       @Test(timeout = 10000L)
+       public void testLogNotFoundHandling() throws Throwable {
+               final int dataPort = NetUtils.getAvailablePort();
+               Configuration config = new Configuration();
+               config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+               config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
"/i/dont/exist");
+
+               try (TaskSubmissionTestEnvironment env =
+                       new TaskSubmissionTestEnvironment.Builder(jobId)
+                               .setConfiguration(config)
+                               .setLocalCommunication(false)
+                               .build()) {
+                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       try {
+                               CompletableFuture<TransientBlobKey> logFuture =
+                                       
tmGateway.requestFileUpload(FileType.LOG, timeout);
+                               logFuture.get();
+                       } catch (Exception e) {
+                               assertThat(e.getMessage(), containsString("The 
file LOG does not exist on the TaskExecutor."));
+                       }
+               }
+       }
+
+       @Test(timeout = 10000L)
+       public void testTerminationOnFatalError() throws Throwable {
+               try (TaskSubmissionTestEnvironment env = new 
TaskSubmissionTestEnvironment.Builder(jobId).build()) {
+                       String testExceptionMsg = "Test exception of fatal 
error.";
+
+                       env.getTaskExecutor().onFatalError(new 
Exception(testExceptionMsg));
+
+                       Throwable exception = 
env.getTestingFatalErrorHandler().getErrorFuture().get();
+                       env.getTestingFatalErrorHandler().clearError();
+
+                       assertThat(exception.getMessage(), 
startsWith(testExceptionMsg));
+               }
+       }
+
        private TaskExecutorLocalStateStoresManager 
createTaskExecutorLocalStateStoresManager() throws IOException {
                return new TaskExecutorLocalStateStoresManager(
                        false,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
index 01d4dd7..1deed3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.net.InetAddress;
+
 import static org.junit.Assert.*;
 
 /**
@@ -85,6 +87,33 @@ public class TaskManagerServicesConfigurationTest extends 
TestLogger {
 
        /**
         * Verifies that {@link 
TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+        * returns the correct result for new configurations via
+        * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
+        * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_MAX},
+        * {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL} and
+        * {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
+        */
+       @Test
+       public void testNetworkRequestBackoffAndBuffers() throws Exception {
+
+               // set some non-default values
+               final Configuration config = new Configuration();
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+               
config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+               
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+
+               TaskManagerServicesConfiguration tmConfig =
+                       
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLoopbackAddress(), true);
+
+               
assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
+               
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
+               
assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
+               
assertEquals(tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), 100);
+       }
+
+       /**
+        * Verifies that {@link 
TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
         * returns the correct result for mixed old/new configurations.
         */
        @SuppressWarnings("deprecation")
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
new file mode 100644
index 0000000..eb106c9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.annotation.Nonnull;
+import java.io.File;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Simple environment setup for task executor task.
+ */
+class TaskSubmissionTestEnvironment implements AutoCloseable {
+
+       private final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
+       private final TestingRpcService testingRpcService = new 
TestingRpcService();
+       private final BlobCacheService blobCacheService= new 
BlobCacheService(new Configuration(), new VoidBlobStore(), null);
+       private final Time timeout = Time.milliseconds(10000L);
+       private final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+       private final TimerService<AllocationID> timerService = new 
TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
+
+       private final TestingHighAvailabilityServices haServices;
+       private final TemporaryFolder temporaryFolder;
+       private final TaskSlotTable taskSlotTable;
+       private final JobMasterId jobMasterId;
+
+       private TestingTaskExecutor taskExecutor;
+
+       private TaskSubmissionTestEnvironment(
+                       JobID jobId,
+                       JobMasterId jobMasterId,
+                       int slotSize,
+                       boolean mockNetworkEnvironment,
+                       TestingJobMasterGateway testingJobMasterGateway,
+                       Configuration configuration,
+                       boolean localCommunication,
+                       List<Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>>> taskManagerActionListeners) throws Exception {
+
+               this.haServices = new TestingHighAvailabilityServices();
+               this.haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
+               this.haServices.setJobMasterLeaderRetriever(jobId, new 
SettableLeaderRetrievalService());
+
+               this.temporaryFolder = new TemporaryFolder();
+               this.temporaryFolder.create();
+
+               this.jobMasterId = jobMasterId;
+
+               if (slotSize > 0) {
+                       this.taskSlotTable = generateTaskSlotTable(slotSize);
+               } else {
+                       this.taskSlotTable = mock(TaskSlotTable.class);
+                       when(taskSlotTable.tryMarkSlotActive(eq(jobId), 
any())).thenReturn(true);
+                       
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
+               }
+
+               JobMasterGateway jobMasterGateway;
+               if (testingJobMasterGateway == null) {
+                       jobMasterGateway = new TestingJobMasterGatewayBuilder()
+                               .setFencingTokenSupplier(() -> jobMasterId)
+                               .build();
+               } else {
+                       jobMasterGateway = testingJobMasterGateway;
+               }
+
+               TaskManagerActions taskManagerActions;
+               if (taskManagerActionListeners.size() == 0) {
+                       taskManagerActions = new NoOpTaskManagerActions();
+               } else {
+                       TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(taskSlotTable, jobMasterGateway);
+                       for (Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
+                               
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
+                       }
+                       taskManagerActions = testTaskManagerActions;
+               }
+
+               final NetworkEnvironment networkEnvironment = 
createNetworkEnvironment(localCommunication, configuration, testingRpcService, 
mockNetworkEnvironment);
+
+               final JobManagerConnection jobManagerConnection = 
createJobManagerConnection(jobId, jobMasterGateway, testingRpcService, 
taskManagerActions, timeout);
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       false,
+                       new File[]{temporaryFolder.newFolder()},
+                       Executors.directExecutor());
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setNetworkEnvironment(networkEnvironment)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
+                       .build();
+
+               taskExecutor = createTaskExecutor(taskManagerServices, 
configuration);
+
+               taskExecutor.start();
+               taskExecutor.waitUntilStarted();
+       }
+
+       public TestingTaskExecutor getTaskExecutor() {
+               return taskExecutor;
+       }
+
+       public TaskExecutorGateway getTaskExecutorGateway() {
+               return taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+       }
+
+       public TaskSlotTable getTaskSlotTable() {
+               return taskSlotTable;
+       }
+
+       public JobMasterId getJobMasterId() {
+               return jobMasterId;
+       }
+
+       public TestingFatalErrorHandler getTestingFatalErrorHandler() {
+               return testingFatalErrorHandler;
+       }
+
+       private TaskSlotTable generateTaskSlotTable(int numSlot) {
+               Collection<ResourceProfile> resourceProfiles = new 
ArrayList<>();
+               for (int i = 0; i < numSlot; i++) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               return new TaskSlotTable(resourceProfiles, timerService);
+       }
+
+       @Nonnull
+       private TestingTaskExecutor createTaskExecutor(TaskManagerServices 
taskManagerServices, Configuration configuration) {
+               return new TestingTaskExecutor(
+                       testingRpcService,
+                       
TaskManagerConfiguration.fromConfiguration(configuration),
+                       haServices,
+                       taskManagerServices,
+                       heartbeatServices,
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       null,
+                       blobCacheService,
+                       testingFatalErrorHandler
+               );
+       }
+
+       private static JobManagerConnection createJobManagerConnection(JobID 
jobId, JobMasterGateway jobMasterGateway, RpcService testingRpcService, 
TaskManagerActions taskManagerActions, Time timeout) {
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+               final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+               
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(), 
any()))
+                       
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+               return new JobManagerConnection(
+                       jobId,
+                       ResourceID.generate(),
+                       jobMasterGateway,
+                       taskManagerActions,
+                       mock(CheckpointResponder.class),
+                       new TestGlobalAggregateManager(),
+                       libraryCacheManager,
+                       new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, 
testingRpcService.getExecutor(), timeout),
+                       partitionProducerStateChecker);
+       }
+
+       private static NetworkEnvironment createNetworkEnvironment(boolean 
localCommunication, Configuration configuration, RpcService testingRpcService, 
boolean mockNetworkEnvironment) throws Exception {
+               final ConnectionManager connectionManager;
+               if (!localCommunication) {
+                       NettyConfig nettyConfig = 
TaskManagerServicesConfiguration
+                               .fromConfiguration(configuration, 
InetAddress.getByName(testingRpcService.getAddress()), 
localCommunication).getNetworkConfig()
+                               .nettyConfig();
+                       connectionManager = new 
NettyConnectionManager(nettyConfig);
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               final int numAllBuffers = 10;
+               final NetworkEnvironment networkEnvironment;
+               if (mockNetworkEnvironment) {
+                       networkEnvironment = mock(NetworkEnvironment.class, 
Mockito.RETURNS_MOCKS);
+               } else {
+                       networkEnvironment = createNetworkEnvironment(
+                               numAllBuffers,
+                               128,
+                               
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+                               
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+                               2,
+                               8,
+                               true,
+                               connectionManager);
+                       networkEnvironment.start();
+               }
+
+               return networkEnvironment;
+       }
+
+       @Nonnull
+       private static NetworkEnvironment createNetworkEnvironment(
+                       int numBuffers,
+                       int memorySegmentSize,
+                       int partitionRequestInitialBackoff,
+                       int partitionRequestMaxBackoff,
+                       int networkBuffersPerChannel,
+                       int extraNetworkBuffersPerGate,
+                       boolean enableCreditBased,
+                       ConnectionManager connectionManager) {
+               return new NetworkEnvironment(
+                       new NetworkBufferPool(numBuffers, memorySegmentSize),
+                       connectionManager,
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       partitionRequestInitialBackoff,
+                       partitionRequestMaxBackoff,
+                       networkBuffersPerChannel,
+                       extraNetworkBuffersPerGate,
+                       enableCreditBased);
+       }
+
+       @Override
+       public void close() throws Exception {
+               RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+
+               timerService.stop();
+
+               blobCacheService.close();
+
+               temporaryFolder.delete();
+
+               testingFatalErrorHandler.rethrowError();
+       }
+
+       public static final class Builder {
+
+               private JobID jobId;
+               private boolean mockNetworkEnvironment = true;
+               private int slotSize;
+               private JobMasterId jobMasterId = JobMasterId.generate();
+               private TestingJobMasterGateway jobMasterGateway;
+               private boolean localCommunication = true;
+               private Configuration configuration = new Configuration();
+
+               private List<Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>>> taskManagerActionListeners = new ArrayList<>();
+
+               public Builder(JobID jobId) {
+                       this.jobId = jobId;
+               }
+
+               public Builder setMockNetworkEnvironment(boolean 
mockNetworkEnvironment) {
+                       this.mockNetworkEnvironment = mockNetworkEnvironment;
+                       return this;
+               }
+
+               public Builder setSlotSize(int slotSize) {
+                       this.slotSize = slotSize;
+                       return this;
+               }
+
+               public Builder setJobMasterId(JobMasterId jobMasterId) {
+                       this.jobMasterId = jobMasterId;
+                       return this;
+               }
+
+               public Builder setJobMasterGateway(TestingJobMasterGateway 
jobMasterGateway) {
+                       this.jobMasterGateway = jobMasterGateway;
+                       return this;
+               }
+
+               public Builder setLocalCommunication(boolean 
localCommunication) {
+                       this.localCommunication = localCommunication;
+                       return this;
+               }
+
+               public Builder setConfiguration(Configuration configuration) {
+                       this.configuration = configuration;
+                       return this;
+               }
+
+               public Builder addTaskManagerActionListener(ExecutionAttemptID 
eid, ExecutionState executionState, CompletableFuture<Void> future) {
+                       taskManagerActionListeners.add(Tuple3.of(eid, 
executionState, future));
+                       return this;
+               }
+
+               public TaskSubmissionTestEnvironment build() throws Exception {
+                       return new TaskSubmissionTestEnvironment(
+                               jobId,
+                               jobMasterId,
+                               slotSize,
+                               mockNetworkEnvironment,
+                               jobMasterGateway,
+                               configuration,
+                               localCommunication,
+                               taskManagerActionListeners);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
new file mode 100644
index 0000000..de36e23
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestTaskManagerActions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Customized {@link TaskManagerActions} that wait for ExecutionState changes.
+ */
+public class TestTaskManagerActions implements TaskManagerActions {
+
+       private final JobMasterGateway jobMasterGateway;
+       private final TaskSlotTable taskSlotTable;
+       private final TaskManagerActionListeners taskManagerActionListeners = 
new TaskManagerActionListeners();
+
+       public TestTaskManagerActions(TaskSlotTable taskSlotTable, 
JobMasterGateway jobMasterGateway) {
+               this.taskSlotTable = taskSlotTable;
+               this.jobMasterGateway = jobMasterGateway;
+       }
+
+       public void addListener(ExecutionAttemptID eid, ExecutionState 
executionState, CompletableFuture<Void> future) {
+               taskManagerActionListeners.addListener(eid, executionState, 
future);
+       }
+
+       @Override
+       public void notifyFatalError(String message, Throwable cause) {
+       }
+
+       @Override
+       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {
+               if (taskSlotTable != null) {
+                       
taskSlotTable.getTask(executionAttemptID).failExternally(cause);
+               }
+       }
+
+       @Override
+       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+               Optional<CompletableFuture<Void>> listenerFuture =
+                       
taskManagerActionListeners.getListenerFuture(taskExecutionState.getID(), 
taskExecutionState.getExecutionState());
+               if (listenerFuture.isPresent()) {
+                       listenerFuture.get().complete(null);
+               }
+               if (jobMasterGateway != null) {
+                       CompletableFuture<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+
+                       futureAcknowledge.whenComplete(
+                               (ack, throwable) -> {
+                                       if (throwable != null) {
+                                               
failTask(taskExecutionState.getID(), throwable);
+                                       }
+                               }
+                       );
+               }
+       }
+
+       private static class TaskManagerActionListeners {
+               private final Map<ExecutionAttemptID, 
List<Tuple2<ExecutionState, CompletableFuture<Void>>>> 
expectExecutionStateAndFutures;
+
+               private TaskManagerActionListeners() {
+                       this.expectExecutionStateAndFutures = new HashMap<>();
+               }
+
+               private void addListener(ExecutionAttemptID eid, ExecutionState 
executionState, CompletableFuture<Void> future) {
+                       final List<Tuple2<ExecutionState, 
CompletableFuture<Void>>> expectExecutionStateAndFutureList = 
expectExecutionStateAndFutures
+                               .getOrDefault(eid, new ArrayList<>());
+                       
expectExecutionStateAndFutureList.add(Tuple2.of(executionState, future));
+                       expectExecutionStateAndFutures.put(eid, 
expectExecutionStateAndFutureList);
+               }
+
+               private Optional<CompletableFuture<Void>> 
getListenerFuture(ExecutionAttemptID eid, ExecutionState executionState) {
+                       List<Tuple2<ExecutionState, CompletableFuture<Void>>> 
expectStateAndFutureList = expectExecutionStateAndFutures.get(eid);
+                       if (expectExecutionStateAndFutures == null) {
+                               return Optional.empty();
+                       }
+                       for (Tuple2<ExecutionState, CompletableFuture<Void>> 
expectStateAndFuture : expectStateAndFutureList) {
+                               if (expectStateAndFuture.f0 == executionState) {
+                                       return 
Optional.of(expectStateAndFuture.f1);
+                               }
+                       }
+                       return Optional.empty();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
deleted file mode 100644
index 937fb1e..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ /dev/null
@@ -1,2185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
-import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
-import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.StopTask;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.util.Failure;
-
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the legacy {@link TaskManager}.
- */
-@SuppressWarnings("serial")
-public class TaskManagerTest extends TestLogger {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerTest.class);
-
-       private static final FiniteDuration timeout = new FiniteDuration(1, 
TimeUnit.MINUTES);
-
-       private static final FiniteDuration d = new FiniteDuration(60, 
TimeUnit.SECONDS);
-       private static final Time timeD = Time.seconds(60L);
-
-       private static ActorSystem system;
-
-       static final UUID LEADER_SESSION_ID = UUID.randomUUID();
-
-       private TestingHighAvailabilityServices highAvailabilityServices;
-
-       @BeforeClass
-       public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       @Before
-       public void setupTest() {
-               highAvailabilityServices = new 
TestingHighAvailabilityServices();
-       }
-
-       @After
-       public void tearDownTest() throws Exception {
-               if (highAvailabilityServices != null) {
-                       highAvailabilityServices.closeAndCleanupAllData();
-
-                       highAvailabilityServices = null;
-               }
-       }
-
-       @Test
-       public void testSubmitAndExecuteTask() throws IOException {
-               new JavaTestKit(system){{
-
-                       ActorGateway taskManager = null;
-                       final ActorGateway jobManager = 
TestingUtils.createForwardingActor(
-                               system,
-                               getTestActor(),
-                               HighAvailabilityServices.DEFAULT_LEADER_ID,
-                               Option.<String>empty());
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       try {
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               false);
-
-                               final ActorGateway tm = taskManager;
-
-                               // handle the registration
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               
expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
-
-                                               final InstanceID iid = new 
InstanceID();
-                                               assertEquals(tm.actor(), 
getLastSender());
-                                               tm.tell(
-                                                               new 
RegistrationMessages.AcknowledgeRegistration(
-                                                                               
iid,
-                                                                               
12345),
-                                                               jobManager);
-                                       }
-                               };
-
-                               final JobID jid = new JobID();
-                               final JobVertexID vid = new JobVertexID();
-                               final ExecutionAttemptID eid = new 
ExecutionAttemptID();
-                               final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
-
-                               final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                                       jid, "TestJob", vid, eid, 
executionConfig,
-                                       "TestTask", 7, 2, 7, 0, new 
Configuration(), new Configuration(),
-                                       TestInvokableCorrect.class.getName(),
-                                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                       
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                       new ArrayList<PermanentBlobKey>(), 
Collections.emptyList(), 0);
-
-                               new Within(d) {
-
-                                       @Override
-                                       protected void run() {
-                                               tm.tell(new SubmitTask(tdd), 
jobManager);
-
-                                               // TaskManager should 
acknowledge the submission
-                                               // heartbeats may be interleaved
-                                               long deadline = 
System.currentTimeMillis() + 10000;
-                                               do {
-                                                       Object message = 
receiveOne(d);
-                                                       if 
(message.equals(Acknowledge.get())) {
-                                                               break;
-                                                       }
-                                               } while 
(System.currentTimeMillis() < deadline);
-
-                                               // task should have switched to 
running
-                                               Object toRunning = new 
TaskMessages.UpdateTaskExecutionState(
-                                                                               
new TaskExecutionState(jid, eid, ExecutionState.RUNNING));
-
-                                               // task should have switched to 
finished
-                                               Object toFinished = new 
TaskMessages.UpdateTaskExecutionState(
-                                                                               
new TaskExecutionState(jid, eid, ExecutionState.FINISHED));
-
-                                               deadline = 
System.currentTimeMillis() + 10000;
-                                               do {
-                                                       Object message = 
receiveOne(d);
-                                                       if 
(message.equals(toRunning)) {
-                                                               break;
-                                                       }
-                                                       else if (!(message 
instanceof TaskManagerMessages.Heartbeat)) {
-                                                               
fail("Unexpected message: " + message);
-                                                       }
-                                               } while 
(System.currentTimeMillis() < deadline);
-
-                                               deadline = 
System.currentTimeMillis() + 10000;
-                                               do {
-                                                       Object message = 
receiveOne(d);
-                                                       if 
(message.equals(toFinished)) {
-                                                               break;
-                                                       }
-                                                       else if (!(message 
instanceof TaskManagerMessages.Heartbeat)) {
-                                                               
fail("Unexpected message: " + message);
-                                                       }
-                                               } while 
(System.currentTimeMillis() < deadline);
-                                       }
-                               };
-                       }
-                       finally {
-                               // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testJobSubmissionAndCanceling() {
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-
-                       try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               true);
-
-                               final JobID jid1 = new JobID();
-                               final JobID jid2 = new JobID();
-
-                               JobVertexID vid1 = new JobVertexID();
-                               JobVertexID vid2 = new JobVertexID();
-
-                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
-                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
-
-                               final TaskDeploymentDescriptor tdd1 = 
createTaskDeploymentDescriptor(
-                                               jid1, "TestJob1", vid1, eid1,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "TestTask1", 5, 1, 5, 0,
-                                               new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.<URL>emptyList(), 0);
-
-                               final TaskDeploymentDescriptor tdd2 = 
createTaskDeploymentDescriptor(
-                                               jid2, "TestJob2", vid2, eid2,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "TestTask2", 7, 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               final ActorGateway tm = taskManager;
-
-                               new Within(d) {
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       Future<Object> 
t1Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
-                                                                       
timeout);
-                                                       Future<Object> 
t2Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
-                                                                       
timeout);
-
-                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
-                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       Await.ready(t1Running, 
d);
-                                                       Await.ready(t2Running, 
d);
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-
-                                                       Map<ExecutionAttemptID, 
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(2, 
runningTasks.size());
-                                                       Task t1 = 
runningTasks.get(eid1);
-                                                       Task t2 = 
runningTasks.get(eid2);
-                                                       assertNotNull(t1);
-                                                       assertNotNull(t2);
-
-                                                       
assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
-                                                       
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
-
-                                                       tm.tell(new 
CancelTask(eid1), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       Future<Object> response 
= tm.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
timeout);
-                                                       Await.ready(response, 
d);
-
-                                                       
assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(1, 
runningTasks.size());
-
-                                                       tm.tell(new 
CancelTask(eid1), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       tm.tell(new 
CancelTask(eid2), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       response = tm.ask(new 
TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                       
timeout);
-                                                       Await.ready(response, 
d);
-
-                                                       
assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(0, 
runningTasks.size());
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testJobSubmissionAndStop() throws Exception {
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-
-                       try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               true);
-
-                               final JobID jid1 = new JobID();
-                               final JobID jid2 = new JobID();
-
-                               JobVertexID vid1 = new JobVertexID();
-                               JobVertexID vid2 = new JobVertexID();
-
-                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
-                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
-
-                               final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
-
-                               final TaskDeploymentDescriptor tdd1 = 
createTaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
-                                               "TestTask1", 5, 1, 5, 0, new 
Configuration(), new Configuration(), StoppableInvokable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               final TaskDeploymentDescriptor tdd2 = 
createTaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
-                                               "TestTask2", 7, 2, 7, 0, new 
Configuration(), new Configuration(), 
TestInvokableBlockingCancelable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               final ActorGateway tm = taskManager;
-
-                               new Within(d) {
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       Future<Object> 
t1Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
-                                                                       
timeout);
-                                                       Future<Object> 
t2Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
-                                                                       
timeout);
-
-                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
-                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       Await.ready(t1Running, 
d);
-                                                       Await.ready(t2Running, 
d);
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-
-                                                       Map<ExecutionAttemptID, 
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(2, 
runningTasks.size());
-                                                       Task t1 = 
runningTasks.get(eid1);
-                                                       Task t2 = 
runningTasks.get(eid2);
-                                                       assertNotNull(t1);
-                                                       assertNotNull(t2);
-
-                                                       
assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
-                                                       
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
-
-                                                       tm.tell(new 
StopTask(eid1), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       Future<Object> response 
= tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
timeout);
-                                                       Await.ready(response, 
d);
-
-                                                       
assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(1, 
runningTasks.size());
-
-                                                       tm.tell(new 
StopTask(eid1), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       tm.tell(new 
StopTask(eid2), testActorGateway);
-                                                       
expectMsgClass(Status.Failure.class);
-
-                                                       
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(1, 
runningTasks.size());
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testGateChannelEdgeMismatch() {
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-
-                       try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               true);
-
-                               final ActorGateway tm = taskManager;
-
-                               final JobID jid = new JobID();
-
-                               JobVertexID vid1 = new JobVertexID();
-                               JobVertexID vid2 = new JobVertexID();
-
-                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
-                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
-
-                               final TaskDeploymentDescriptor tdd1 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Sender", 1, 0, 1, 0,
-                                               new Configuration(), new 
Configuration(), TestingAbstractInvokables.Sender.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               final TaskDeploymentDescriptor tdd2 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Receiver", 7, 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), TestingAbstractInvokables.Receiver.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               new Within(d){
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
-                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       tm.tell(new 
TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                       
testActorGateway);
-                                                       tm.tell(new 
TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                       
testActorGateway);
-
-                                                       expectMsgEquals(true);
-                                                       expectMsgEquals(true);
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       Map<ExecutionAttemptID, 
Task> tasks = expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(0, 
tasks.size());
-                                               } catch (Exception e){
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testRunJobWithForwardChannel() {
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-                       try {
-                               final JobID jid = new JobID();
-
-                               JobVertexID vid1 = new JobVertexID();
-                               JobVertexID vid2 = new JobVertexID();
-
-                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
-                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
-
-                               ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(LEADER_SESSION_ID)));
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               true);
-
-                               final ActorGateway tm = taskManager;
-
-                               IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
-
-                               List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, 1, true));
-
-                               InputGateDeploymentDescriptor ircdd =
-                                               new 
InputGateDeploymentDescriptor(
-                                                               new 
IntermediateDataSetID(), ResultPartitionType.PIPELINED,
-                                                               0, new 
InputChannelDeploymentDescriptor[]{
-                                                                               
new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), 
ResultPartitionLocation.createLocal())
-                                                               }
-                                               );
-
-                               final TaskDeploymentDescriptor tdd1 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Sender", 1, 0, 1, 0,
-                                               new Configuration(), new 
Configuration(), TestingAbstractInvokables.Sender.class.getName(),
-                                               irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<>(),
-                                               Collections.emptyList(), 0);
-
-                               final TaskDeploymentDescriptor tdd2 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Receiver", 7, 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), TestingAbstractInvokables.Receiver.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.singletonList(ircdd),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               new Within(d) {
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       Future<Object> 
t1Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
-                                                                       
timeout);
-
-                                                       Future<Object> 
t2Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
-                                                                       
timeout);
-
-                                                       // submit the sender 
task
-                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       // wait until the 
sender task is running
-                                                       Await.ready(t1Running, 
d);
-
-                                                       // only now (after the 
sender is running), submit the receiver task
-                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       // wait until the 
receiver task is running
-                                                       Await.ready(t2Running, 
d);
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       Map<ExecutionAttemptID, 
Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
-                                                                       
.class).asJava();
-
-                                                       Task t1 = 
tasks.get(eid1);
-                                                       Task t2 = 
tasks.get(eid2);
-
-                                                       // wait until the tasks 
are done. thread races may cause the tasks to be done before
-                                                       // we get to the check, 
so we need to guard the check
-                                                       if (t1 != null) {
-                                                               Future<Object> 
response = tm.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                               
timeout);
-                                                               
Await.ready(response, d);
-                                                       }
-
-                                                       if (t2 != null) {
-                                                               Future<Object> 
response = tm.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                               
timeout);
-                                                               
Await.ready(response, d);
-                                                               
assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
-                                                       }
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       tasks = 
expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
-                                                                       
.class).asJava();
-
-                                                       assertEquals(0, 
tasks.size());
-                                               }
-                                               catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testCancellingDependentAndStateUpdateFails() {
-               // this tests creates two tasks. the sender sends data, and 
fails to send the
-               // state update back to the job manager
-               // the second one blocks to be canceled
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-                       try {
-                               final JobID jid = new JobID();
-
-                               JobVertexID vid1 = new JobVertexID();
-                               JobVertexID vid2 = new JobVertexID();
-
-                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
-                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
-
-                               ActorRef jm = system.actorOf(
-                                               Props.create(
-                                                               new 
SimpleLookupFailingUpdateJobManagerCreator(
-                                                                       
LEADER_SESSION_ID,
-                                                                               
eid2)
-                                               )
-                               );
-
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               true);
-
-                               final ActorGateway tm = taskManager;
-
-                               IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
-
-                               List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, 1, true));
-
-                               InputGateDeploymentDescriptor ircdd =
-                                               new 
InputGateDeploymentDescriptor(
-                                                               new 
IntermediateDataSetID(), ResultPartitionType.PIPELINED,
-                                                               0, new 
InputChannelDeploymentDescriptor[]{
-                                                                               
new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), 
ResultPartitionLocation.createLocal())
-                                                               }
-                                               );
-
-                               final TaskDeploymentDescriptor tdd1 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Sender", 1, 0, 1, 0,
-                                               new Configuration(), new 
Configuration(), TestingAbstractInvokables.Sender.class.getName(),
-                                               irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               final TaskDeploymentDescriptor tdd2 = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Receiver", 7, 2, 7, 0,
-                                               new Configuration(), new 
Configuration(), Tasks.BlockingReceiver.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.singletonList(ircdd),
-                                               new ArrayList<>(), 
Collections.emptyList(), 0);
-
-                               new Within(d){
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       Future<Object> 
t1Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
-                                                                       
timeout);
-
-                                                       Future<Object> 
t2Running = tm.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
-                                                                       
timeout);
-
-                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
-                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       Await.ready(t1Running, 
d);
-                                                       Await.ready(t2Running, 
d);
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       Map<ExecutionAttemptID, 
Task> tasks = expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       Task t1 = 
tasks.get(eid1);
-                                                       Task t2 = 
tasks.get(eid2);
-
-                                                       tm.tell(new 
CancelTask(eid2), testActorGateway);
-                                                       
expectMsgEquals(Acknowledge.get());
-
-                                                       if (t2 != null) {
-                                                               Future<Object> 
response = tm.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-                                                                               
timeout);
-                                                               
Await.ready(response, d);
-                                                       }
-
-                                                       if (t1 != null) {
-                                                               if 
(t1.getExecutionState() == ExecutionState.RUNNING) {
-                                                                       
tm.tell(new CancelTask(eid1), testActorGateway);
-                                                                       
expectMsgEquals(Acknowledge.get());
-                                                               }
-                                                               Future<Object> 
response = tm.ask(
-                                                                               
new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-                                                                               
timeout);
-                                                               
Await.ready(response, d);
-                                                       }
-
-                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
-                                                       tasks = 
expectMsgClass(TestingTaskManagerMessages
-                                                                       
.ResponseRunningTasks.class).asJava();
-
-                                                       assertEquals(0, 
tasks.size());
-                                               }
-                                               catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       /**
-        * Tests that repeated remote {@link PartitionNotFoundException}s 
ultimately fail the receiver.
-        */
-       @Test
-       public void testRemotePartitionNotFound() throws Exception {
-
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-
-                       try {
-                               final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
-
-                               // Create the JM
-                               ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
-
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               final int dataPort = 
NetUtils.getAvailablePort();
-                               Configuration config = new Configuration();
-                               config.setInteger(TaskManagerOptions.DATA_PORT, 
dataPort);
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               config,
-                                               false,
-                                               true);
-
-                               // 
---------------------------------------------------------------------------------
-
-                               final ActorGateway tm = taskManager;
-
-                               final JobID jid = new JobID();
-                               final JobVertexID vid = new JobVertexID();
-                               final ExecutionAttemptID eid = new 
ExecutionAttemptID();
-
-                               final ResultPartitionID partitionId = new 
ResultPartitionID();
-
-                               // Remote location (on the same TM though) for 
the partition
-                               final ResultPartitionLocation loc = 
ResultPartitionLocation
-                                               .createRemote(new ConnectionID(
-                                                               new 
InetSocketAddress("localhost", dataPort), 0));
-
-                               final InputChannelDeploymentDescriptor[] icdd =
-                                               new 
InputChannelDeploymentDescriptor[] {
-                                                               new 
InputChannelDeploymentDescriptor(partitionId, loc)};
-
-                               final InputGateDeploymentDescriptor igdd =
-                                               new 
InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
-
-                               final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid, eid,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Receiver", 1, 0, 1, 0,
-                                               new Configuration(), new 
Configuration(),
-                                               
Tasks.AgnosticReceiver.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               Collections.singletonList(igdd),
-                                               Collections.emptyList(),
-                                               Collections.emptyList(), 0);
-
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               // Submit the task
-                                               tm.tell(new SubmitTask(tdd), 
testActorGateway);
-                                               
expectMsgClass(Acknowledge.get().getClass());
-
-                                               // Wait to be notified about 
the final execution state by the mock JM
-                                               TaskExecutionState msg = 
expectMsgClass(TaskExecutionState.class);
-
-                                               // The task should fail after 
repeated requests
-                                               
assertEquals(ExecutionState.FAILED, msg.getExecutionState());
-                                               Throwable t = 
msg.getError(ClassLoader.getSystemClassLoader());
-                                               assertEquals("Thrown exception 
was not a PartitionNotFoundException: " + t.getMessage(),
-                                                       
PartitionNotFoundException.class, t.getClass());
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testTaskManagerServicesConfiguration() throws Exception {
-
-               // set some non-default values
-               final Configuration config = new Configuration();
-               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-               
config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
-               
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
-
-               TaskManagerServicesConfiguration tmConfig =
-                       
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLoopbackAddress(), true);
-
-               
assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
-               
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
-               
assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
-               
assertEquals(tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), 100);
-       }
-
-       /**
-        *  Tests that repeated local {@link PartitionNotFoundException}s 
ultimately fail the receiver.
-        */
-       @Test
-       public void testLocalPartitionNotFound() throws Exception {
-
-               new JavaTestKit(system){{
-
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                               LEADER_SESSION_ID);
-
-                       try {
-                               final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
-
-                               // Create the JM
-                               ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
-
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               final Configuration config = new 
Configuration();
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-
-                               taskManager = TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               config,
-                                               true,
-                                               true);
-
-                               // 
---------------------------------------------------------------------------------
-
-                               final ActorGateway tm = taskManager;
-
-                               final JobID jid = new JobID();
-                               final JobVertexID vid = new JobVertexID();
-                               final ExecutionAttemptID eid = new 
ExecutionAttemptID();
-
-                               final ResultPartitionID partitionId = new 
ResultPartitionID();
-
-                               // Local location (on the same TM though) for 
the partition
-                               final ResultPartitionLocation loc = 
ResultPartitionLocation.createLocal();
-
-                               final InputChannelDeploymentDescriptor[] icdd =
-                                               new 
InputChannelDeploymentDescriptor[] {
-                                                               new 
InputChannelDeploymentDescriptor(partitionId, loc)};
-
-                               final InputGateDeploymentDescriptor igdd =
-                                               new 
InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
-
-                               final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                                               jid, "TestJob", vid, eid,
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Receiver", 1, 0, 1, 0,
-                                               new Configuration(), new 
Configuration(),
-                                               
Tasks.AgnosticReceiver.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               Collections.singletonList(igdd),
-                                               Collections.emptyList(),
-                                               Collections.emptyList(), 0);
-
-                               new Within(new FiniteDuration(120, 
TimeUnit.SECONDS)) {
-                                       @Override
-                                       protected void run() {
-                                               // Submit the task
-                                               tm.tell(new SubmitTask(tdd), 
testActorGateway);
-                                               
expectMsgClass(Acknowledge.get().getClass());
-
-                                               // Wait to be notified about 
the final execution state by the mock JM
-                                               TaskExecutionState msg = 
expectMsgClass(TaskExecutionState.class);
-
-                                               // The task should fail after 
repeated requests
-                                               
assertEquals(msg.getExecutionState(), ExecutionState.FAILED);
-
-                                               Throwable error = 
msg.getError(getClass().getClassLoader());
-                                               if (error.getClass() != 
PartitionNotFoundException.class) {
-                                                       error.printStackTrace();
-                                                       fail("Wrong exception: 
" + error.getMessage());
-                                               }
-                                       }
-                               };
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       @Test
-       public void testLogNotFoundHandling() throws Exception {
-
-               new JavaTestKit(system){{
-
-                       // we require a JobManager so that the BlobService is 
also started
-                       ActorGateway jobManager = null;
-                       ActorGateway taskManager = null;
-
-                       try {
-
-                               // Create the JM
-                               ActorRef jm = system.actorOf(Props.create(
-                                       new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
-
-                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                               final int dataPort = 
NetUtils.getAvailablePort();
-                               Configuration config = new Configuration();
-                               config.setInteger(TaskManagerOptions.DATA_PORT, 
dataPort);
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-                               
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               taskManager = TestingUtils.createTaskManager(
-                                       system,
-                                       highAvailabilityServices,
-                                       config,
-                                       false,
-                                       true);
-
-                               // 
---------------------------------------------------------------------------------
-
-                               final ActorGateway tm = taskManager;
-
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               Future<Object> logFuture = 
tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
-                                               try {
-                                                       Await.result(logFuture, 
timeout);
-                                                       Assert.fail();
-                                               } catch (Exception e) {
-                                                       
Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are 
unavailable. Log file could not be found at"));
-                                               }
-                                       }
-                               };
-                       } finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       // 
------------------------------------------------------------------------
-       // Stack trace sample
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Tests sampling of task stack traces.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSampleMessage() throws Exception {
-               new JavaTestKit(system) {{
-                       ActorGateway taskManagerActorGateway = null;
-
-                       // We need this to be a JM that answers to update 
messages for
-                       // robustness on Travis (if jobs need to be resubmitted 
in (4)).
-                       ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(
-                               HighAvailabilityServices.DEFAULT_LEADER_ID)));
-                       ActorGateway jobManagerActorGateway = new 
AkkaActorGateway(
-                               jm,
-                               HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
-                                       getTestActor(),
-                                       
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                       try {
-                               final ActorGateway jobManager = 
jobManagerActorGateway;
-
-                               
highAvailabilityServices.setJobMasterLeaderRetriever(
-                                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                                       new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                               final ActorGateway taskManager = 
TestingUtils.createTaskManager(
-                                               system,
-                                               highAvailabilityServices,
-                                               new Configuration(),
-                                               true,
-                                               false);
-
-                               final JobID jobId = new JobID();
-
-                               // Single blocking task
-                               final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                                               jobId,
-                                               "Job",
-                                               new JobVertexID(),
-                                               new ExecutionAttemptID(),
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Task",
-                                               1,
-                                               0,
-                                               1,
-                                               0,
-                                               new Configuration(),
-                                               new Configuration(),
-                                               
BlockingNoOpInvokable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               Collections.emptyList(),
-                                               Collections.emptyList(),
-                                               0);
-
-                               // Submit the task
-                               new Within(d) {
-
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       // Make sure to register
-                                                       Future<?> connectFuture 
= taskManager.ask(new TestingTaskManagerMessages
-                                                                       
.NotifyWhenRegisteredAtJobManager(jobManager.actor()), remaining());
-                                                       
Await.ready(connectFuture, remaining());
-
-                                                       Future<Object> 
taskRunningFuture = taskManager.ask(
-                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
-                                                                               
        tdd.getExecutionAttemptId()), timeout);
-
-                                                       taskManager.tell(new 
SubmitTask(tdd));
-
-                                                       
Await.ready(taskRunningFuture, d);
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-
-                               //
-                               // 1) Trigger sample for non-existing task
-                               //
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ExecutionAttemptID 
taskId = new ExecutionAttemptID();
-
-                                                       taskManager.tell(new 
TriggerStackTraceSample(
-                                                                               
        112223,
-                                                                               
        taskId,
-                                                                               
        100,
-                                                                               
        timeD,
-                                                                               
        0),
-                                                                       
testActorGateway);
-
-                                                       // Receive the expected 
message (heartbeat races possible)
-                                                       Object[] msg = 
receiveN(1);
-                                                       while (!(msg[0] 
instanceof Status.Failure)) {
-                                                               msg = 
receiveN(1);
-                                                       }
-
-                                                       Status.Failure response 
= (Status.Failure) msg[0];
-
-                                                       
assertEquals(IllegalStateException.class, response.cause().getClass());
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-
-                               //
-                               // 2) Trigger sample for the blocking task
-                               //
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               boolean success = false;
-                                               Throwable lastError = null;
-
-                                               for (int i = 0; i < 100 && 
!success; i++) {
-                                                       try {
-                                                               int numSamples 
= 5;
-
-                                                               
taskManager.tell(new TriggerStackTraceSample(
-                                                                               
                19230,
-                                                                               
                tdd.getExecutionAttemptId(),
-                                                                               
                numSamples,
-                                                                               
                Time.milliseconds(100L),
-                                                                               
                0),
-                                                                               
testActorGateway);
-
-                                                               // Receive the 
expected message (heartbeat races possible)
-                                                               Object[] msg = 
receiveN(1);
-                                                               while (!(msg[0] 
instanceof StackTraceSampleResponse)) {
-                                                                       msg = 
receiveN(1);
-                                                               }
-
-                                                               
StackTraceSampleResponse response = (StackTraceSampleResponse) msg[0];
-
-                                                               // ---- Verify 
response ----
-                                                               
assertEquals(19230, response.getSampleId());
-                                                               
assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
-
-                                                               
List<StackTraceElement[]> traces = response.getSamples();
-
-                                                               
assertEquals("Number of samples", numSamples, traces.size());
-
-                                                               for 
(StackTraceElement[] trace : traces) {
-                                                                       // Look 
for BlockingNoOpInvokable#invoke
-                                                                       for 
(StackTraceElement elem : trace) {
-                                                                               
if (elem.getClassName().equals(
-                                                                               
                BlockingNoOpInvokable.class.getName())) {
-
-                                                                               
        assertEquals("invoke", elem.getMethodName());
-                                                                               
        success = true;
-                                                                               
        break;
-                                                                               
}
-                                                                       }
-
-                                                                       
assertTrue("Unexpected stack trace: " +
-                                                                               
        Arrays.toString(trace), success);
-                                                               }
-                                                       } catch (Throwable t) {
-                                                               lastError = t;
-                                                               
LOG.warn("Failed to find invokable.", t);
-                                                       }
-
-                                                       try {
-                                                               
Thread.sleep(100);
-                                                       } catch 
(InterruptedException e) {
-                                                               
LOG.error("Interrupted while sleeping before retry.", e);
-                                                               break;
-                                                       }
-                                               }
-
-                                               if (!success) {
-                                                       if (lastError == null) {
-                                                               fail("Failed to 
find invokable");
-                                                       } else {
-                                                               
fail(lastError.getMessage());
-                                                       }
-                                               }
-                                       }
-                               };
-
-                               //
-                               // 3) Trigger sample for the blocking task with 
max depth
-                               //
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       int numSamples = 5;
-                                                       int maxDepth = 2;
-
-                                                       taskManager.tell(new 
TriggerStackTraceSample(
-                                                                               
        1337,
-                                                                               
        tdd.getExecutionAttemptId(),
-                                                                               
        numSamples,
-                                                                               
        Time.milliseconds(100L),
-                                                                               
        maxDepth),
-                                                                       
testActorGateway);
-
-                                                       // Receive the expected 
message (heartbeat races possible)
-                                                       Object[] msg = 
receiveN(1);
-                                                       while (!(msg[0] 
instanceof StackTraceSampleResponse)) {
-                                                               msg = 
receiveN(1);
-                                                       }
-
-                                                       
StackTraceSampleResponse response = (StackTraceSampleResponse) msg[0];
-
-                                                       // ---- Verify response 
----
-                                                       assertEquals(1337, 
response.getSampleId());
-                                                       
assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
-
-                                                       
List<StackTraceElement[]> traces = response.getSamples();
-
-                                                       assertEquals("Number of 
samples", numSamples, traces.size());
-
-                                                       for 
(StackTraceElement[] trace : traces) {
-                                                               
assertEquals("Max depth", maxDepth, trace.length);
-                                                       }
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-
-                               //
-                               // 4) Trigger sample for the blocking task, but 
cancel it during sampling
-                               //
-                               new Within(d) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       int maxAttempts = 10;
-                                                       int sleepTime = 100;
-                                                       for (int i = 0; i < 
maxAttempts; i++, sleepTime *= 2) {
-                                                               // Trigger many 
samples in order to cancel the task
-                                                               // during a 
sample
-                                                               
taskManager.tell(
-                                                                       new 
TriggerStackTraceSample(
-                                                                               
44,
-                                                                               
tdd.getExecutionAttemptId(),
-                                                                               
Integer.MAX_VALUE,
-                                                                               
Time.milliseconds(10L),
-                                                                               
0),
-                                                                       
testActorGateway);
-
-                                                               
Thread.sleep(sleepTime);
-
-                                                               Future<?> 
removeFuture = taskManager.ask(
-                                                                               
new TestingTaskManagerMessages.NotifyWhenJobRemoved(jobId),
-                                                                               
remaining());
-
-                                                               // Cancel the 
task
-                                                               
taskManager.tell(new CancelTask(tdd.getExecutionAttemptId()));
-
-                                                               // Receive the 
expected message (heartbeat races possible)
-                                                               while (true) {
-                                                                       
Object[] msg = receiveN(1);
-                                                                       if 
(msg[0] instanceof StackTraceSampleResponse) {
-                                                                               
StackTraceSampleResponse response = (StackTraceSampleResponse) msg[0];
-
-                                                                               
assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
-                                                                               
assertEquals(44, response.getSampleId());
-
-                                                                               
// Done
-                                                                               
return;
-                                                                       } else 
if (msg[0] instanceof Failure) {
-                                                                               
// Wait for removal before resubmitting
-                                                                               
Await.ready(removeFuture, remaining());
-
-                                                                               
Future<?> taskRunningFuture = taskManager.ask(
-                                                                               
                new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
-                                                                               
                                tdd.getExecutionAttemptId()), timeout);
-
-                                                                               
// Resubmit
-                                                                               
taskManager.tell(new SubmitTask(tdd));
-
-                                                                               
Await.ready(taskRunningFuture, remaining());
-
-                                                                               
// Retry the sample message
-                                                                               
break;
-                                                                       } else {
-                                                                               
// Different message
-                                                                               
continue;
-                                                                       }
-                                                               }
-                                                       }
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       } finally {
-                               TestingUtils.stopActor(taskManagerActorGateway);
-                               TestingUtils.stopActor(jobManagerActorGateway);
-                       }
-               }};
-       }
-
-       @Test
-       public void testTerminationOnFatalError() {
-               highAvailabilityServices.setJobMasterLeaderRetriever(
-                       HighAvailabilityServices.DEFAULT_JOB_ID,
-                       new SettableLeaderRetrievalService());
-
-               new JavaTestKit(system){{
-
-                       final ActorGateway taskManager = 
TestingUtils.createTaskManager(
-                                       system,
-                                       highAvailabilityServices, // no 
jobmanager
-                                       new Configuration(),
-                                       true,
-                                       false);
-
-                       try {
-                               watch(taskManager.actor());
-                               taskManager.tell(new FatalError("test fatal 
error", new Exception("something super bad")));
-                               expectTerminated(d, taskManager.actor());
-                       }
-                       finally {
-                               taskManager.tell(Kill.getInstance());
-                       }
-               }};
-       }
-
-       /**
-        * Test that a failing schedule or update consumers call leads to the 
failing of the respective
-        * task.
-        *
-        * <p>IMPORTANT: We have to make sure that the invokable's cancel 
method is called, because only
-        * then the future is completed. We do this by not eagerly deploy 
consumer tasks and requiring
-        * the invokable to fill one memory segment. The completed memory 
segment will trigger the
-        * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
-        * memory segment, we'll block the invokable and wait for the task 
failure due to the failed
-        * schedule or update consumers call.
-        */
-       @Test(timeout = 10000L)
-       public void testFailingScheduleOrUpdateConsumersMessage() throws 
Exception {
-               new JavaTestKit(system) {{
-                       final Configuration configuration = new Configuration();
-
-                       // set the memory segment to the smallest size 
possible, because we have to fill one
-                       // memory buffer to trigger the schedule or update 
consumers message to the downstream
-                       // operators
-                       
configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
-
-                       final JobID jid = new JobID();
-                       final JobVertexID vid = new JobVertexID();
-                       final ExecutionAttemptID eid = new ExecutionAttemptID();
-                       final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
-
-                       final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
-                               new IntermediateDataSetID(),
-                               new IntermediateResultPartitionID(),
-                               ResultPartitionType.PIPELINED,
-                               1,
-                               1,
-                               true);
-
-                       final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
-                               "TestTask", 1, 0, 1, 0, new Configuration(), 
new Configuration(),
-                               TestInvokableRecordCancel.class.getName(),
-                               
Collections.singletonList(resultPartitionDeploymentDescriptor),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               new ArrayList<>(), Collections.emptyList(), 0);
-
-                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
LEADER_SESSION_ID), "jobmanager");
-                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, LEADER_SESSION_ID);
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       final ActorGateway taskManager = 
TestingUtils.createTaskManager(
-                               system,
-                               highAvailabilityServices,
-                               configuration,
-                               true,
-                               true);
-
-                       try {
-                               
TestInvokableRecordCancel.resetGotCanceledFuture();
-
-                               Future<Object> result = taskManager.ask(new 
SubmitTask(tdd), timeout);
-
-                               Await.result(result, timeout);
-
-                               CompletableFuture<Boolean> cancelFuture = 
TestInvokableRecordCancel.gotCanceled();
-
-                               assertEquals(true, cancelFuture.get());
-                       } finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
-                       }
-               }};
-       }
-
-       /**
-        * Tests that the TaskManager sends a proper exception back to the 
sender if the submit task
-        * message fails.
-        */
-       @Test
-       public void testSubmitTaskFailure() throws Exception {
-               ActorGateway jobManager = null;
-               ActorGateway taskManager = null;
-
-               try {
-
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       taskManager = TestingUtils.createTaskManager(
-                               system,
-                               highAvailabilityServices,
-                               new Configuration(),
-                               true,
-                               true);
-
-                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                               new JobID(),
-                               "test job",
-                               new JobVertexID(),
-                               new ExecutionAttemptID(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "test task",
-                               0, // this will make the submission fail 
because the number of key groups must be >= 1
-                               0,
-                               1,
-                               0,
-                               new Configuration(),
-                               new Configuration(),
-                               "Foobar",
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.emptyList(),
-                               Collections.emptyList(),
-                               0);
-
-                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
-
-                       try {
-                               Await.result(submitResponse, timeout);
-
-                               fail("The submit task message should have 
failed.");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       }
-               } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
-               }
-       }
-
-       /**
-        * Tests that the TaskManager sends a proper exception back to the 
sender if the stop task
-        * message fails.
-        */
-       @Test
-       public void testStopTaskFailure() throws Exception {
-               ActorGateway jobManager = null;
-               ActorGateway taskManager = null;
-
-               try {
-                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
-
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       taskManager = TestingUtils.createTaskManager(
-                               system,
-                               highAvailabilityServices,
-                               new Configuration(),
-                               true,
-                               true);
-
-                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                               new JobID(),
-                               "test job",
-                               new JobVertexID(),
-                               executionAttemptId,
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "test task",
-                               1,
-                               0,
-                               1,
-                               0,
-                               new Configuration(),
-                               new Configuration(),
-                               BlockingNoOpInvokable.class.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.emptyList(),
-                               Collections.emptyList(),
-                               0);
-
-                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
-
-                       Await.result(submitResponse, timeout);
-
-                       final Future<Object> taskRunning = taskManager.ask(new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptId), 
timeout);
-
-                       Await.result(taskRunning, timeout);
-
-                       Future<Object> stopResponse = taskManager.ask(new 
StopTask(executionAttemptId), timeout);
-
-                       try {
-                               Await.result(stopResponse, timeout);
-
-                               fail("The stop task message should have 
failed.");
-                       } catch (UnsupportedOperationException e) {
-                               // expected
-                       }
-               } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
-               }
-       }
-
-       /**
-        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
-        * trace message fails.
-        */
-       @Test
-       public void testStackTraceSampleFailure() throws Exception {
-               ActorGateway jobManager = null;
-               ActorGateway taskManager = null;
-
-               try {
-
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       taskManager = TestingUtils.createTaskManager(
-                               system,
-                               highAvailabilityServices,
-                               new Configuration(),
-                               true,
-                               true);
-
-                       Future<Object> stackTraceResponse = taskManager.ask(
-                               new TriggerStackTraceSample(
-                                       0,
-                                       new ExecutionAttemptID(),
-                                       0,
-                                       Time.milliseconds(1L),
-                                       0),
-                               timeout);
-
-                       try {
-                               Await.result(stackTraceResponse, timeout);
-
-                               fail("The trigger stack trace message should 
have failed.");
-                       } catch (IllegalStateException e) {
-                               // expected
-                       }
-               } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
-               }
-       }
-
-       /**
-        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
-        * trace message fails.
-        */
-       @Test
-       public void testUpdateTaskInputPartitionsFailure() throws Exception {
-               ActorGateway jobManager = null;
-               ActorGateway taskManager = null;
-
-               try {
-
-                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
-
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
-                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
-
-                       highAvailabilityServices.setJobMasterLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID,
-                               new 
StandaloneLeaderRetrievalService(jobManager.path(), 
jobManager.leaderSessionID()));
-
-                       taskManager = TestingUtils.createTaskManager(
-                               system,
-                               highAvailabilityServices,
-                               new Configuration(),
-                               true,
-                               true);
-
-                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
-                               new JobID(),
-                               "test job",
-                               new JobVertexID(),
-                               executionAttemptId,
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "test task",
-                               1,
-                               0,
-                               1,
-                               0,
-                               new Configuration(),
-                               new Configuration(),
-                               BlockingNoOpInvokable.class.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.emptyList(),
-                               Collections.emptyList(),
-                               0);
-
-                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
-
-                       Await.result(submitResponse, timeout);
-
-                       Future<Object> partitionUpdateResponse = 
taskManager.ask(
-                               new TaskMessages.UpdateTaskSinglePartitionInfo(
-                                       executionAttemptId,
-                                       new IntermediateDataSetID(),
-                                       new 
InputChannelDeploymentDescriptor(new ResultPartitionID(), 
ResultPartitionLocation.createLocal())),
-                               timeout);
-
-                       try {
-                               Await.result(partitionUpdateResponse, timeout);
-
-                               fail("The update task input partitions message 
should have failed.");
-                       } catch (Exception e) {
-                               // expected
-                       }
-               } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class SimpleJobManager extends FlinkUntypedActor {
-
-               private final UUID leaderSessionID;
-
-               public SimpleJobManager(UUID leaderSessionID) {
-                       this.leaderSessionID = leaderSessionID;
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof 
RegistrationMessages.RegisterTaskManager) {
-                               final InstanceID iid = new InstanceID();
-                               final ActorRef self = getSelf();
-                               getSender().tell(
-                                               decorateMessage(
-                                                               new 
RegistrationMessages.AcknowledgeRegistration(
-                                                                       iid,
-                                                                       12345)
-                                               ),
-                                               self);
-                       }
-                       else if (message instanceof 
TaskMessages.UpdateTaskExecutionState){
-                               getSender().tell(true, getSelf());
-                       }
-               }
-
-               @Override
-               protected UUID getLeaderSessionID() {
-                       return leaderSessionID;
-               }
-       }
-
-       public static class FailingScheduleOrUpdateConsumersJobManager extends 
SimpleJobManager {
-
-               public FailingScheduleOrUpdateConsumersJobManager(UUID 
leaderSessionId) {
-                       super(leaderSessionId);
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof ScheduleOrUpdateConsumers) {
-                               getSender().tell(
-                                       decorateMessage(
-                                               new Status.Failure(new 
Exception("Could not schedule or update consumers."))),
-                                       getSelf());
-                       } else {
-                               super.handleMessage(message);
-                       }
-               }
-       }
-
-       public static class SimpleLookupJobManager extends SimpleJobManager {
-
-               public SimpleLookupJobManager(UUID leaderSessionID) {
-                       super(leaderSessionID);
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof ScheduleOrUpdateConsumers) {
-                               getSender().tell(
-                                               
decorateMessage(Acknowledge.get()),
-                                               getSelf()
-                                               );
-                       } else {
-                               super.handleMessage(message);
-                       }
-               }
-       }
-
-       public static class SimpleLookupFailingUpdateJobManager extends 
SimpleLookupJobManager{
-
-               private final Set<ExecutionAttemptID> validIDs;
-
-               public SimpleLookupFailingUpdateJobManager(UUID 
leaderSessionID, Set<ExecutionAttemptID> ids) {
-                       super(leaderSessionID);
-                       this.validIDs = new HashSet<>(ids);
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception{
-                       if (message instanceof 
TaskMessages.UpdateTaskExecutionState) {
-                               TaskMessages.UpdateTaskExecutionState updateMsg 
=
-                                               
(TaskMessages.UpdateTaskExecutionState) message;
-
-                               if 
(validIDs.contains(updateMsg.taskExecutionState().getID())) {
-                                       getSender().tell(true, getSelf());
-                               } else {
-                                       getSender().tell(false, getSelf());
-                               }
-                       } else {
-                               super.handleMessage(message);
-                       }
-               }
-       }
-
-       public static class SimplePartitionStateLookupJobManager extends 
SimpleJobManager {
-
-               private final ActorRef testActor;
-
-               public SimplePartitionStateLookupJobManager(UUID 
leaderSessionID, ActorRef testActor) {
-                       super(leaderSessionID);
-                       this.testActor = testActor;
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof RequestPartitionProducerState) {
-                               
getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf());
-                       }
-                       else if (message instanceof 
TaskMessages.UpdateTaskExecutionState) {
-                               final TaskExecutionState msg = 
((TaskMessages.UpdateTaskExecutionState) message)
-                                               .taskExecutionState();
-
-                               if (msg.getExecutionState().isTerminal()) {
-                                       testActor.tell(msg, self());
-                               }
-                       } else {
-                               super.handleMessage(message);
-                       }
-               }
-       }
-
-       public static class SimpleLookupJobManagerCreator implements 
Creator<SimpleLookupJobManager>{
-
-               private final UUID leaderSessionID;
-
-               public SimpleLookupJobManagerCreator(UUID leaderSessionID) {
-                       this.leaderSessionID = leaderSessionID;
-               }
-
-               @Override
-               public SimpleLookupJobManager create() throws Exception {
-                       return new SimpleLookupJobManager(leaderSessionID);
-               }
-       }
-
-       public static class SimpleLookupFailingUpdateJobManagerCreator 
implements Creator<SimpleLookupFailingUpdateJobManager>{
-
-               private final UUID leaderSessionID;
-
-               private final Set<ExecutionAttemptID> validIDs;
-
-               public SimpleLookupFailingUpdateJobManagerCreator(UUID 
leaderSessionID, ExecutionAttemptID ... ids) {
-                       this.leaderSessionID = leaderSessionID;
-
-                       validIDs = new HashSet<ExecutionAttemptID>();
-
-                       for (ExecutionAttemptID id : ids) {
-                               this.validIDs.add(id);
-                       }
-               }
-
-               @Override
-               public SimpleLookupFailingUpdateJobManager create() throws 
Exception {
-                       return new 
SimpleLookupFailingUpdateJobManager(leaderSessionID, validIDs);
-               }
-       }
-
-       public static class SimplePartitionStateLookupJobManagerCreator 
implements Creator<SimplePartitionStateLookupJobManager>{
-
-               private final UUID leaderSessionID;
-
-               private final ActorRef testActor;
-
-               public SimplePartitionStateLookupJobManagerCreator(UUID 
leaderSessionID, ActorRef testActor) {
-                       this.leaderSessionID = leaderSessionID;
-
-                       this.testActor = testActor;
-               }
-
-               @Override
-               public SimplePartitionStateLookupJobManager create() throws 
Exception {
-                       return new 
SimplePartitionStateLookupJobManager(leaderSessionID, testActor);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static final class TestInvokableCorrect extends 
AbstractInvokable {
-
-               public TestInvokableCorrect(Environment environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() {}
-       }
-
-       public static class TestInvokableBlockingCancelable extends 
AbstractInvokable {
-
-               public TestInvokableBlockingCancelable(Environment environment) 
{
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       final Object o = new Object();
-                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                       synchronized (o) {
-                               //noinspection InfiniteLoopStatement
-                               while (true) {
-                                       o.wait();
-                               }
-                       }
-               }
-       }
-
-       public static final class TestInvokableRecordCancel extends 
AbstractInvokable {
-
-               private static final Object lock = new Object();
-               private static CompletableFuture<Boolean> gotCanceledFuture = 
new CompletableFuture<>();
-
-               public TestInvokableRecordCancel(Environment environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       final Object o = new Object();
-                       RecordWriter<IntValue> recordWriter = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
-
-                       for (int i = 0; i < 1024; i++) {
-                               recordWriter.emit(new IntValue(42));
-                       }
-
-                       synchronized (o) {
-                               //noinspection InfiniteLoopStatement
-                               while (true) {
-                                       o.wait();
-                               }
-                       }
-
-               }
-
-               @Override
-               public void cancel() {
-                       synchronized (lock) {
-                               gotCanceledFuture.complete(true);
-                       }
-               }
-
-               public static void resetGotCanceledFuture() {
-                       synchronized (lock) {
-                               gotCanceledFuture = new CompletableFuture<>();
-                       }
-               }
-
-               public static CompletableFuture<Boolean> gotCanceled() {
-                       synchronized (lock) {
-                               return gotCanceledFuture;
-                       }
-               }
-       }
-
-       private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
-               JobID jobId,
-               String jobName,
-               JobVertexID jobVertexId,
-               ExecutionAttemptID executionAttemptId,
-               SerializedValue<ExecutionConfig> serializedExecutionConfig,
-               String taskName,
-               int numberOfKeyGroups,
-               int subtaskIndex,
-               int parallelism,
-               int attemptNumber,
-               Configuration jobConfiguration,
-               Configuration taskConfiguration,
-               String invokableClassName,
-               Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions,
-               Collection<InputGateDeploymentDescriptor> inputGates,
-               Collection<PermanentBlobKey> requiredJarFiles,
-               Collection<URL> requiredClasspaths,
-               int targetSlotNumber) throws IOException {
-
-               JobInformation jobInformation = new JobInformation(
-                       jobId,
-                       jobName,
-                       serializedExecutionConfig,
-                       jobConfiguration,
-                       requiredJarFiles,
-                       requiredClasspaths);
-
-               TaskInformation taskInformation = new TaskInformation(
-                       jobVertexId,
-                       taskName,
-                       parallelism,
-                       numberOfKeyGroups,
-                       invokableClassName,
-                       taskConfiguration);
-
-               SerializedValue<JobInformation> serializedJobInformation = new 
SerializedValue<>(jobInformation);
-               SerializedValue<TaskInformation> serializedJobVertexInformation 
= new SerializedValue<>(taskInformation);
-
-               return new TaskDeploymentDescriptor(
-                       jobId,
-                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
-                       executionAttemptId,
-                       new AllocationID(),
-                       subtaskIndex,
-                       attemptNumber,
-                       targetSlotNumber,
-                       null,
-                       producedPartitions,
-                       inputGates);
-
-       }
-}
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 1bce221..c68b45e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -200,81 +200,6 @@ object TestingUtils {
     }
   }
 
-  /** Creates a local TaskManager in the given ActorSystem. It is given a
-    * [[StandaloneLeaderRetrievalService]] which returns the given 
jobManagerURL. After creating
-    * the TaskManager, waitForRegistration specifies whether one waits until 
the TaskManager has
-    * registered at the JobManager. An ActorGateway to the TaskManager is 
returned.
-    *
-    * @param actorSystem ActorSystem in which the TaskManager shall be started
-    * @param highAvailabilityServices Service factory for high availability
-    * @param configuration Configuration
-    * @param useLocalCommunication true if the network stack shall use 
exclusively local
-    *                              communication
-    * @param waitForRegistration true if the method will wait until the 
TaskManager has connected to
-    *                            the JobManager
-    * @return ActorGateway of the created TaskManager
-    */
-  def createTaskManager(
-      actorSystem: ActorSystem,
-      highAvailabilityServices: HighAvailabilityServices,
-      configuration: Configuration,
-      useLocalCommunication: Boolean,
-      waitForRegistration: Boolean)
-    : ActorGateway = {
-    createTaskManager(
-      actorSystem,
-      highAvailabilityServices,
-      configuration,
-      useLocalCommunication,
-      waitForRegistration,
-      classOf[TestingTaskManager]
-    )
-  }
-
-  def createTaskManager(
-      actorSystem: ActorSystem,
-      highAvailabilityServices: HighAvailabilityServices,
-      configuration: Configuration,
-      useLocalCommunication: Boolean,
-      waitForRegistration: Boolean,
-      taskManagerClass: Class[_ <: TaskManager])
-    : ActorGateway = {
-
-    val resultingConfiguration = new Configuration()
-
-    resultingConfiguration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"10m")
-
-    resultingConfiguration.addAll(configuration)
-
-    val metricRegistry = new MetricRegistryImpl(
-      MetricRegistryConfiguration.fromConfiguration(configuration))
-
-    val taskManagerResourceId = ResourceID.generate()
-
-    val taskManager = TaskManager.startTaskManagerComponentsAndActor(
-      resultingConfiguration,
-      taskManagerResourceId,
-      actorSystem,
-      highAvailabilityServices,
-      metricRegistry,
-      "localhost",
-      None,
-      useLocalCommunication,
-      taskManagerClass
-    )
-
-    val leaderId = if (waitForRegistration) {
-      val notificationResult = (taskManager ? 
NotifyWhenRegisteredAtJobManager)(TESTING_DURATION)
-        .mapTo[RegisteredAtJobManager]
-
-      Await.result(notificationResult, TESTING_DURATION).leaderId
-    } else {
-      HighAvailabilityServices.DEFAULT_LEADER_ID
-    }
-
-    new AkkaActorGateway(taskManager, leaderId)
-  }
-
   /** Stops the given actor by sending it a Kill message
     *
     * @param actor
@@ -284,115 +209,4 @@ object TestingUtils {
       actor ! Kill
     }
   }
-
-  /** Stops the given actor by sending it a Kill message
-    *
-    * @param actorGateway
-    */
-  def stopActor(actorGateway: ActorGateway): Unit = {
-    if (actorGateway != null) {
-      stopActor(actorGateway.actor())
-    }
-  }
-
-  def stopActorGracefully(actor: ActorRef): Unit = {
-    val gracefulStopFuture = Patterns.gracefulStop(actor, 
TestingUtils.TESTING_TIMEOUT)
-
-    Await.result(gracefulStopFuture, TestingUtils.TESTING_TIMEOUT)
-  }
-
-  def stopActorGracefully(actorGateway: ActorGateway): Unit = {
-    stopActorGracefully(actorGateway.actor())
-  }
-
-  def stopActorsGracefully(actors: ActorRef*): Unit = {
-    val gracefulStopFutures = actors.flatMap{
-      actor =>
-        Option(actor) match {
-          case Some(actorRef) => Some(Patterns.gracefulStop(actorRef, 
TestingUtils.TESTING_TIMEOUT))
-          case None => None
-        }
-    }
-
-    implicit val executionContext = defaultExecutionContext
-
-    val globalStopFuture = 
scala.concurrent.Future.sequence(gracefulStopFutures)
-
-    Await.result(globalStopFuture, TestingUtils.TESTING_TIMEOUT)
-  }
-
-  def stopActorsGracefully(actors: java.util.List[ActorRef]): Unit = {
-    import scala.collection.JavaConverters._
-
-    stopActorsGracefully(actors.asScala: _*)
-  }
-
-  def stopActorGatewaysGracefully(actorGateways: ActorGateway*): Unit = {
-    val actors = actorGateways.flatMap {
-      actorGateway =>
-        Option(actorGateway) match {
-          case Some(actorGateway) => Some(actorGateway.actor())
-          case None => None
-        }
-    }
-
-    stopActorsGracefully(actors: _*)
-  }
-
-  def stopActorGatewaysGracefully(actorGateways: 
java.util.List[ActorGateway]): Unit = {
-    import scala.collection.JavaConverters._
-
-    stopActorGatewaysGracefully(actorGateways.asScala: _*)
-  }
-
-  /** Creates a forwarding JobManager which sends all received message to the 
forwarding target.
-    *
-    * @param actorSystem The actor system to start the actor in.
-    * @param forwardingTarget Target to forward to.
-    * @param leaderId leader id for the actor gateway
-    * @param actorName Name for forwarding Actor
-    * @return
-    */
-  def createForwardingActor(
-      actorSystem: ActorSystem,
-      forwardingTarget: ActorRef,
-      leaderId: UUID,
-      actorName: Option[String] = None)
-    : ActorGateway = {
-
-    val actor = actorName match {
-      case Some(name) =>
-        actorSystem.actorOf(
-          Props(
-            classOf[ForwardingActor],
-            forwardingTarget,
-            Option(leaderId)),
-          name
-        )
-      case None =>
-        actorSystem.actorOf(
-          Props(
-            classOf[ForwardingActor],
-            forwardingTarget,
-            Option(leaderId))
-        )
-    }
-
-    new AkkaActorGateway(actor, leaderId)
-  }
-
-  class ForwardingActor(val target: ActorRef, val leaderSessionID: 
Option[UUID])
-    extends FlinkActor with LeaderSessionMessageFilter with LogMessages {
-
-    /** Handle incoming messages
-      *
-      * @return
-      */
-    override def handleMessage: Receive = {
-      case msg => target.forward(msg)
-    }
-
-    override val log: Logger = Logger(getClass)
-  }
-
 }

Reply via email to