asfgit closed pull request #6834: [FLINK-10508] [tests] Port JobManagerITCase 
to new code base
URL: https://github.com/apache/flink/pull/6834
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 30ac84f4b66..a399bff77de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -20,68 +20,546 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.Tasks.AgnosticBinaryReceiver;
+import org.apache.flink.runtime.jobmanager.Tasks.AgnosticReceiver;
+import org.apache.flink.runtime.jobmanager.Tasks.AgnosticTertiaryReceiver;
+import org.apache.flink.runtime.jobmanager.Tasks.ExceptionReceiver;
+import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender;
+import org.apache.flink.runtime.jobmanager.Tasks.Forwarder;
+import org.apache.flink.runtime.jobmanager.Tasks.InstantiationErrorSender;
+import org.apache.flink.runtime.jobmanager.Tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.Tasks.Sender;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Integration test cases for the {@link MiniCluster}.
  */
 public class MiniClusterITCase extends TestLogger {
 
-       private static Configuration configuration;
+       @Test
+       public void runJobWithSingleRpcService() throws Exception {
+               final int parallelism = 123;
 
-       @BeforeClass
-       public static void setup() {
-               configuration = new Configuration();
-               configuration.setInteger(WebOptions.PORT, 0);
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.SHARED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       
miniCluster.executeJobBlocking(getSimpleJob(parallelism));
+               }
        }
 
-       // 
------------------------------------------------------------------------
-       //  Simple Job Running Tests
-       // 
------------------------------------------------------------------------
+       @Test
+       public void runJobWithMultipleRpcServices() throws Exception {
+               final int parallelism = 123;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       
miniCluster.executeJobBlocking(getSimpleJob(parallelism));
+               }
+       }
 
        @Test
-       public void runJobWithSingleRpcService() throws Exception {
-               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
-                       .setRpcServiceSharing(RpcServiceSharing.SHARED)
+       public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+               final Configuration configuration = getDefaultConfiguration();
+               configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
1000L);
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(1)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
                        .setConfiguration(configuration)
                        .build();
 
-               MiniCluster miniCluster = new MiniCluster(cfg);
-               try {
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
                        miniCluster.start();
-                       executeJob(miniCluster);
+
+                       final JobVertex vertex = new JobVertex("Test Vertex");
+                       vertex.setParallelism(2);
+                       vertex.setMaxParallelism(2);
+                       vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+                       final JobGraph jobGraph = new JobGraph("Test Job", 
vertex);
+                       jobGraph.setAllowQueuedScheduling(true);
+                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowableWithMessage(e, "Job 
execution failed.").isPresent());
+
+                               assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Slots 
required: 2, slots allocated: 1").isPresent());
+                       }
                }
-               finally {
-                       miniCluster.close();
+       }
+
+       @Test
+       public void testForwardJob() throws Exception {
+               final int parallelism = 31;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(Sender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(Receiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       miniCluster.executeJobBlocking(jobGraph);
                }
        }
 
        @Test
-       public void runJobWithMultipleRpcServices() throws Exception {
-               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+       public void testBipartiteJob() throws Exception {
+               final int parallelism = 31;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
                        .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                       .setConfiguration(configuration)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(Sender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(AgnosticReceiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Bipartite Job", 
sender, receiver);
+
+                       miniCluster.executeJobBlocking(jobGraph);
+               }
+       }
+
+       @Test
+       public void testTwoInputJobFailingEdgeMismatch() throws Exception {
+               final int parallelism = 1;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(6 * parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
                        .build();
 
-               MiniCluster miniCluster = new MiniCluster(cfg);
-               try {
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
                        miniCluster.start();
-                       executeJob(miniCluster);
+
+                       final JobVertex sender1 = new JobVertex("Sender1");
+                       sender1.setInvokableClass(Sender.class);
+                       sender1.setParallelism(parallelism);
+
+                       final JobVertex sender2 = new JobVertex("Sender2");
+                       sender2.setInvokableClass(Sender.class);
+                       sender2.setParallelism(2 * parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       
receiver.setInvokableClass(AgnosticTertiaryReceiver.class);
+                       receiver.setParallelism(3 * parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender1, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+                       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Bipartite Job", 
sender1, receiver, sender2);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
ArrayIndexOutOfBoundsException.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, 
"2").isPresent());
+                       }
                }
-               finally {
-                       miniCluster.close();
+       }
+
+       @Test
+       public void testTwoInputJob() throws Exception {
+               final int parallelism = 11;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(6 * parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender1 = new JobVertex("Sender1");
+                       sender1.setInvokableClass(Sender.class);
+                       sender1.setParallelism(parallelism);
+
+                       final JobVertex sender2 = new JobVertex("Sender2");
+                       sender2.setInvokableClass(Sender.class);
+                       sender2.setParallelism(2 * parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       
receiver.setInvokableClass(AgnosticBinaryReceiver.class);
+                       receiver.setParallelism(3 * parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender1, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+                       receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Bipartite Job", 
sender1, receiver, sender2);
+
+                       miniCluster.executeJobBlocking(jobGraph);
+               }
+       }
+
+       @Test
+       public void testSchedulingAllAtOnce() throws Exception {
+               final int parallelism = 16;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(Sender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex forwarder = new JobVertex("Forwarder");
+                       forwarder.setInvokableClass(Forwarder.class);
+                       forwarder.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(AgnosticReceiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       final SlotSharingGroup sharingGroup = new 
SlotSharingGroup(sender.getID(), receiver.getID());
+                       sender.setSlotSharingGroup(sharingGroup);
+                       forwarder.setSlotSharingGroup(sharingGroup);
+                       receiver.setSlotSharingGroup(sharingGroup);
+
+                       forwarder.connectNewDataSetAsInput(sender, 
DistributionPattern.ALL_TO_ALL,
+                               ResultPartitionType.PIPELINED);
+                       receiver.connectNewDataSetAsInput(forwarder, 
DistributionPattern.ALL_TO_ALL,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Forwarding 
Job", sender, forwarder, receiver);
+
+                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+                       miniCluster.executeJobBlocking(jobGraph);
+               }
+       }
+
+       @Test
+       public void testJobWithAFailingSenderVertex() throws Exception {
+               final int parallelism = 100;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(ExceptionSender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(Receiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
Exception.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Test 
exception").isPresent());
+                       }
+               }
+       }
+
+       @Test
+       public void testJobWithAnOccasionallyFailingSenderVertex() throws 
Exception {
+               final int parallelism = 100;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       
sender.setInvokableClass(SometimesExceptionSender.class);
+                       sender.setParallelism(parallelism);
+
+                       // set failing senders
+                       
SometimesExceptionSender.configFailingSenders(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(Receiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
Exception.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Test 
exception").isPresent());
+                       }
+               }
+       }
+
+       @Test
+       public void testJobWithAFailingReceiverVertex() throws Exception {
+               final int parallelism = 200;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       sender.setInvokableClass(Sender.class);
+                       sender.setParallelism(parallelism);
+
+                       // set failing senders
+                       
SometimesExceptionSender.configFailingSenders(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(ExceptionReceiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
Exception.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Test 
exception").isPresent());
+                       }
+               }
+       }
+
+       @Test
+       public void testJobWithAllVerticesFailingDuringInstantiation() throws 
Exception {
+               final int parallelism = 200;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       
sender.setInvokableClass(InstantiationErrorSender.class);
+                       sender.setParallelism(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(Receiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
Exception.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Test 
exception in constructor").isPresent());
+                       }
+               }
+       }
+
+       @Test
+       public void testJobWithSomeVerticesFailingDuringInstantiation() throws 
Exception {
+               final int parallelism = 200;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex sender = new JobVertex("Sender");
+                       
sender.setInvokableClass(SometimesInstantiationErrorSender.class);
+                       sender.setParallelism(parallelism);
+
+                       // set failing senders
+                       
SometimesInstantiationErrorSender.configFailingSenders(parallelism);
+
+                       final JobVertex receiver = new JobVertex("Receiver");
+                       receiver.setInvokableClass(Receiver.class);
+                       receiver.setParallelism(parallelism);
+
+                       receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new JobGraph("Pointwise Job", 
sender, receiver);
+
+                       try {
+                               miniCluster.executeJobBlocking(jobGraph);
+
+                               fail("Job should fail.");
+                       } catch (JobExecutionException e) {
+                               assertTrue(findThrowable(e, 
Exception.class).isPresent());
+                               assertTrue(findThrowableWithMessage(e, "Test 
exception in constructor").isPresent());
+                       }
+               }
+       }
+
+       @Test
+       public void testCallFinalizeOnMasterBeforeJobCompletes() throws 
Exception {
+               final int parallelism = 31;
+
+               final MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(parallelism)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .setConfiguration(getDefaultConfiguration())
+                       .build();
+
+               try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+                       miniCluster.start();
+
+                       final JobVertex source = new JobVertex("Source");
+                       source.setInvokableClass(WaitingNoOpInvokable.class);
+                       source.setParallelism(parallelism);
+
+                       final WaitOnFinalizeJobVertex sink = new 
WaitOnFinalizeJobVertex("Sink", 500L);
+                       sink.setInvokableClass(NoOpInvokable.class);
+                       sink.setParallelism(parallelism);
+
+                       sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED);
+
+                       final JobGraph jobGraph = new 
JobGraph("SubtaskInFinalStateRaceCondition", source, sink);
+
+                       final CompletableFuture<JobSubmissionResult> 
submissionFuture = miniCluster.submitJob(jobGraph);
+
+                       final CompletableFuture<JobResult> jobResultFuture = 
submissionFuture.thenCompose(
+                               (JobSubmissionResult ignored) -> 
miniCluster.requestJobResult(jobGraph.getJobID()));
+
+                       sink.latch.await();
+
+                       assertFalse(jobResultFuture.isDone());
+                       
jobResultFuture.get().toJobExecutionResult(getClass().getClassLoader());
                }
        }
 
@@ -89,25 +567,47 @@ public void runJobWithMultipleRpcServices() throws 
Exception {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static void executeJob(MiniCluster miniCluster) throws 
Exception {
-               JobGraph job = getSimpleJob();
-               miniCluster.executeJobBlocking(job);
+       private Configuration getDefaultConfiguration() {
+               final Configuration configuration = new Configuration();
+               configuration.setInteger(RestOptions.PORT, 0);
+
+               return configuration;
        }
 
-       private static JobGraph getSimpleJob() throws IOException {
-               JobVertex task = new JobVertex("Test task");
-               task.setParallelism(1);
-               task.setMaxParallelism(1);
+       private static JobGraph getSimpleJob(int parallelism) throws 
IOException {
+               final JobVertex task = new JobVertex("Test task");
+               task.setParallelism(parallelism);
+               task.setMaxParallelism(parallelism);
                task.setInvokableClass(NoOpInvokable.class);
 
-               JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
+               final JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
                jg.setAllowQueuedScheduling(true);
                jg.setScheduleMode(ScheduleMode.EAGER);
 
-               ExecutionConfig executionConfig = new ExecutionConfig();
+               final ExecutionConfig executionConfig = new ExecutionConfig();
                
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 1000));
                jg.setExecutionConfig(executionConfig);
 
                return jg;
        }
+
+       private static class WaitOnFinalizeJobVertex extends JobVertex {
+
+               private OneShotLatch latch;
+
+               private final long waitingTime;
+
+               WaitOnFinalizeJobVertex(String name, long waitingTime) {
+                       super(name);
+
+                       this.latch = new OneShotLatch();
+                       this.waitingTime = waitingTime;
+               }
+
+               @Override
+               public void finalizeOnMaster(ClassLoader loader) throws 
Exception {
+                       Thread.sleep(waitingTime);
+                       latch.trigger();
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java
new file mode 100644
index 00000000000..6ed3c4df304
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that 
sometimes fails.
+ */
+public class SometimesExceptionSender extends AbstractInvokable {
+
+       private static Set<Integer> failingSenders;
+
+       private static final Random RANDOM = new Random();
+
+       public SometimesExceptionSender(Environment environment) {
+               super(environment);
+       }
+
+       static void configFailingSenders(int numOfTasks) {
+               failingSenders = new HashSet<>();
+
+               while (failingSenders.size() < 10) {
+                       failingSenders.add(RANDOM.nextInt(numOfTasks));
+               }
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               if (failingSenders.contains(this.getIndexInSubtaskGroup())) {
+                       throw new Exception("Test exception");
+               } else {
+                       final Object o = new Object();
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (o) {
+                               //noinspection InfiniteLoopStatement
+                               while (true) {
+                                       o.wait();
+                               }
+                       }
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java
new file mode 100644
index 00000000000..d632da3a5c6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that 
sometimes fails on constructor.
+ */
+public class SometimesInstantiationErrorSender extends BlockingNoOpInvokable {
+
+       private static Set<Integer> failingSenders;
+
+       private static final Random RANDOM = new Random();
+
+       public SometimesInstantiationErrorSender(Environment environment) {
+               super(environment);
+
+               if (failingSenders.contains(this.getIndexInSubtaskGroup())) {
+                       throw new RuntimeException("Test exception in 
constructor");
+               }
+       }
+
+       static void configFailingSenders(int numOfTasks) {
+               failingSenders = new HashSet<>();
+
+               while (failingSenders.size() < 10) {
+                       failingSenders.add(RANDOM.nextInt(numOfTasks));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3ffe7701682..d13fe0b5cc1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -66,578 +66,6 @@ class JobManagerITCase(_system: ActorSystem)
 
   "The JobManager actor" must {
 
-    "handle jobs when not enough slots" in {
-      val vertex = new JobVertex("Test Vertex")
-      vertex.setParallelism(2)
-      vertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-
-      val jobGraph = new JobGraph("Test Job", vertex)
-
-      val cluster = TestingUtils.startTestingCluster(1)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        val response = jmGateway.ask(RequestTotalNumberOfSlots, 
timeout.duration).mapTo[Int]
-
-        val availableSlots = Await.result(response, duration)
-
-        availableSlots should equal(1)
-
-        within(2 second) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
-        }
-
-        within(2 second) {
-          val response = expectMsgType[JobResultFailure]
-          val exception = 
response.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-              new NoResourceAvailableException(1,1,0) should 
equal(e.getCause())
-            case e => fail(s"Received wrong exception of type $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      }
-      finally {
-        cluster.stop()
-      }
-    }
-
-    "support immediate scheduling of a single vertex" in {
-      val num_tasks = 133
-      val vertex = new JobVertex("Test Vertex")
-      vertex.setParallelism(num_tasks)
-      vertex.setInvokableClass(classOf[NoOpInvokable])
-
-      val jobGraph = new JobGraph("Test Job", vertex)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        val response = jmGateway.ask(RequestTotalNumberOfSlots, 
timeout.duration).mapTo[Int]
-
-        val availableSlots = Await.result(response, duration)
-
-        availableSlots should equal(num_tasks)
-
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-          
-          val result = expectMsgType[JobResultSuccess]
-          result.result.getJobId() should equal(jobGraph.getJobID)
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support queued scheduling of a single vertex" in {
-      val num_tasks = 111
-
-      val vertex = new JobVertex("Test Vertex")
-      vertex.setParallelism(num_tasks)
-      vertex.setInvokableClass(classOf[NoOpInvokable])
-
-      val jobGraph = new JobGraph("Test job", vertex)
-      jobGraph.setAllowQueuedScheduling(true)
-
-      val cluster = TestingUtils.startTestingCluster(10)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val result = expectMsgType[JobResultSuccess]
-
-          result.result.getJobId() should equal(jobGraph.getJobID)
-        }
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support forward jobs" in {
-      val num_tasks = 31
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[Receiver])
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val result = expectMsgType[JobResultSuccess]
-
-          result.result.getJobId() should equal(jobGraph.getJobID)
-        }
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support bipartite job" in {
-      val num_tasks = 31
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          expectMsgType[JobResultSuccess]
-        }
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support two input job failing edge mismatch" in {
-      val num_tasks = 1
-      val sender1 = new JobVertex("Sender1")
-      val sender2 = new JobVertex("Sender2")
-      val receiver = new JobVertex("Receiver")
-
-      sender1.setInvokableClass(classOf[Sender])
-      sender2.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[AgnosticTertiaryReceiver])
-
-      sender1.setParallelism(num_tasks)
-      sender2.setParallelism(2 * num_tasks)
-      receiver.setParallelism(3 * num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-      receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
-
-      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support two input job" in {
-      val num_tasks = 11
-      val sender1 = new JobVertex("Sender1")
-      val sender2 = new JobVertex("Sender2")
-      val receiver = new JobVertex("Receiver")
-
-      sender1.setInvokableClass(classOf[Sender])
-      sender2.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
-
-      sender1.setParallelism(num_tasks)
-      sender2.setParallelism(2 * num_tasks)
-      receiver.setParallelism(3 * num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-      receiver.connectNewDataSetAsInput(sender2, 
DistributionPattern.ALL_TO_ALL,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
-
-      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          expectMsgType[JobResultSuccess]
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "support scheduling all at once" in {
-      val num_tasks = 16
-      val sender = new JobVertex("Sender")
-      val forwarder = new JobVertex("Forwarder")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[Sender])
-      forwarder.setInvokableClass(classOf[Forwarder])
-      receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-      sender.setParallelism(num_tasks)
-      forwarder.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
-      sender.setSlotSharingGroup(sharingGroup)
-      forwarder.setSlotSharingGroup(sharingGroup)
-      receiver.setSlotSharingGroup(sharingGroup)
-
-      forwarder.connectNewDataSetAsInput(sender, 
DistributionPattern.ALL_TO_ALL,
-        ResultPartitionType.PIPELINED)
-      receiver.connectNewDataSetAsInput(forwarder, 
DistributionPattern.ALL_TO_ALL,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, 
receiver)
-
-      jobGraph.setScheduleMode(ScheduleMode.EAGER)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          expectMsgType[JobResultSuccess]
-
-          jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-          expectMsg(true)
-        }
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "handle job with a failing sender vertex" in {
-      val num_tasks = 100
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[ExceptionSender])
-      receiver.setInvokableClass(classOf[Receiver])
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(RequestTotalNumberOfSlots, self)
-          expectMsg(num_tasks)
-        }
-
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "handle job with an occasionally failing sender vertex" in {
-      val num_tasks = 100
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[SometimesExceptionSender])
-      receiver.setInvokableClass(classOf[Receiver])
-
-      // set failing senders
-      SometimesExceptionSender.failingSenders = 
Seq.fill(10)(Random.nextInt(num_tasks)).toSet
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(RequestTotalNumberOfSlots, self)
-          expectMsg(num_tasks)
-        }
-
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "handle job with a failing receiver vertex" in {
-      val num_tasks = 200
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[ExceptionReceiver])
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "handle job with all vertices failing during instantiation" in {
-      val num_tasks = 200
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[InstantiationErrorSender])
-      receiver.setInvokableClass(classOf[Receiver])
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(RequestTotalNumberOfSlots, self)
-          expectMsg(num_tasks)
-
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "handle job with some vertices failing during instantiation" in {
-      val num_tasks = 200
-      val sender = new JobVertex("Sender")
-      val receiver = new JobVertex("Receiver")
-
-      sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
-      receiver.setInvokableClass(classOf[Receiver])
-
-      // set the failing sender tasks
-      SometimesInstantiationErrorSender.failingSenders =
-        Seq.fill(10)(Random.nextInt(num_tasks)).toSet
-
-      sender.setParallelism(num_tasks)
-      receiver.setParallelism(num_tasks)
-
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
-        ResultPartitionType.PIPELINED)
-
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
-
-      val cluster = TestingUtils.startTestingCluster(num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(RequestTotalNumberOfSlots, self)
-          expectMsg(num_tasks)
-
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
-          val failure = expectMsgType[JobResultFailure]
-          val exception = 
failure.cause.deserializeError(getClass.getClassLoader())
-          exception match {
-            case e: JobExecutionException =>
-              jobGraph.getJobID should equal(e.getJobID)
-
-            case e => fail(s"Received wrong exception $e.")
-          }
-        }
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally {
-        cluster.stop()
-      }
-    }
-
-    "check that all job vertices have completed the call to finalizeOnMaster 
before the job " +
-      "completes" in {
-      val num_tasks = 31
-
-      val source = new JobVertex("Source")
-      val sink = new WaitingOnFinalizeJobVertex("Sink", 500)
-
-      source.setInvokableClass(classOf[WaitingNoOpInvokable])
-      sink.setInvokableClass(classOf[NoOpInvokable])
-
-      source.setParallelism(num_tasks)
-      sink.setParallelism(num_tasks)
-
-      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, 
sink)
-
-      val cluster = TestingUtils.startTestingCluster(2*num_tasks)
-      val jmGateway = cluster.getLeaderGateway(1 seconds)
-
-      try{
-        within(TestingUtils.TESTING_DURATION){
-          jmGateway.tell(SubmitJob(jobGraph, 
ListeningBehaviour.EXECUTION_RESULT), self)
-
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultSuccess]
-        }
-
-        sink.finished should equal(true)
-
-        jmGateway.tell(NotifyWhenJobRemoved(jobGraph.getJobID), self)
-        expectMsg(true)
-      } finally{
-        cluster.stop()
-      }
-    }
-
     "remove execution graphs when the client ends the session explicitly" in {
       val vertex = new JobVertex("Test Vertex")
       vertex.setInvokableClass(classOf[NoOpInvokable])
@@ -756,70 +184,6 @@ class JobManagerITCase(_system: ActorSystem)
     // Savepoint messages
     // ------------------------------------------------------------------------
 
-    "handle trigger savepoint response for non-existing job" in {
-      val deadline = TestingUtils.TESTING_DURATION.fromNow
-
-      val flinkCluster = TestingUtils.startTestingCluster(0, 0)
-
-      try {
-        within(deadline.timeLeft) {
-          val jobManager = flinkCluster
-            .getLeaderGateway(deadline.timeLeft)
-
-          // we have to make sure that the job manager knows also that he is 
the leader
-          // in case of standalone leader retrieval this can happen after the 
getLeaderGateway call
-          val leaderFuture = jobManager.ask(NotifyWhenLeader, timeout.duration)
-          Await.ready(leaderFuture, timeout.duration)
-
-          val jobId = new JobID()
-
-          // Trigger savepoint for non-existing job
-          jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), 
testActor)
-          val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
-
-          // Verify the response
-          response.jobId should equal(jobId)
-          response.cause.getClass should 
equal(classOf[IllegalArgumentException])
-        }
-      }
-      finally {
-        flinkCluster.stop()
-      }
-    }
-
-    "handle trigger savepoint response for job with disabled checkpointing" in 
{
-      val deadline = TestingUtils.TESTING_DURATION.fromNow
-
-      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
-
-      try {
-        within(deadline.timeLeft) {
-          val jobManager = flinkCluster
-            .getLeaderGateway(deadline.timeLeft)
-
-          val jobVertex = new JobVertex("Blocking vertex")
-          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
-
-          // Submit job w/o checkpointing configured
-          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
-
-          // Trigger savepoint for job with disabled checkpointing
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
-          val response = 
expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
-
-          // Verify the response
-          response.jobId should equal(jobGraph.getJobID())
-          response.cause.getClass should equal(classOf[IllegalStateException])
-          response.cause.getMessage should include("not a streaming job")
-        }
-      }
-      finally {
-        flinkCluster.stop()
-      }
-    }
-
     "handle trigger savepoint response after trigger savepoint failure" in {
       val deadline = TestingUtils.TESTING_DURATION.fromNow
 
@@ -949,78 +313,6 @@ class JobManagerITCase(_system: ActorSystem)
         flinkCluster.stop()
       }
     }
-
-    "handle trigger savepoint response after succeeded savepoint future" in {
-      val deadline = TestingUtils.TESTING_DURATION.fromNow
-
-      val flinkCluster = TestingUtils.startTestingCluster(1, 1)
-
-      try {
-        within(deadline.timeLeft) {
-          val jobManager = flinkCluster
-            .getLeaderGateway(deadline.timeLeft)
-
-          val jobVertex = new JobVertex("Blocking vertex")
-          jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
-          jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
-            java.util.Collections.emptyList(),
-            java.util.Collections.emptyList(),
-            java.util.Collections.emptyList(),
-            new CheckpointCoordinatorConfiguration(
-              60000,
-              60000,
-              60000,
-              1,
-              CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-              true),
-            null))
-
-          // Submit job...
-          jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
testActor)
-          expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
-
-          // Mock the checkpoint coordinator
-          val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
-          doThrow(new IllegalStateException("Expected Test Exception"))
-            .when(checkpointCoordinator)
-            .triggerSavepoint(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyString())
-
-          val savepointPromise = new CompletableFuture[CompletedCheckpoint]()
-          doReturn(savepointPromise, Nil: _*)
-            .when(checkpointCoordinator)
-            .triggerSavepoint(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyString())
-
-          // Request the execution graph and set a checkpoint coordinator mock
-          jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor)
-          val executionGraph = expectMsgType[ExecutionGraphFound](
-            deadline.timeLeft).executionGraph
-
-          // Update the savepoint coordinator field
-          val field = 
executionGraph.getClass.getDeclaredField("checkpointCoordinator")
-          field.setAccessible(true)
-          field.set(executionGraph, checkpointCoordinator)
-
-          // Trigger savepoint for job
-          jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
-
-          val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
-          when(checkpoint.getExternalPointer).thenReturn("Expected test 
savepoint path")
-
-          // Succeed the promise
-          savepointPromise.complete(checkpoint)
-
-          val response = 
expectMsgType[TriggerSavepointSuccess](deadline.timeLeft)
-
-          // Verify the response
-          response.jobId should equal(jobGraph.getJobID())
-          response.savepointPath should equal("Expected test savepoint path")
-        }
-      }
-      finally {
-        flinkCluster.stop()
-      }
-    }
   }
 
   class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) 
extends JobVertex(name){
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index a5d90826e30..02458b83c9f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriter
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.types.IntValue
 
-
 object Tasks {
 
   class Sender(environment: Environment)
@@ -197,24 +196,6 @@ object Tasks {
     }
   }
 
-  class SometimesExceptionSender(environment: Environment)
-    extends AbstractInvokable(environment) {
-
-    override def invoke(): Unit = {
-      // this only works if the TaskManager runs in the same JVM as the test 
case
-      
if(SometimesExceptionSender.failingSenders.contains(this.getIndexInSubtaskGroup)){
-        throw new Exception("Test exception")
-      }else{
-        val o = new Object()
-        o.synchronized(o.wait())
-      }
-    }
-  }
-
-  object SometimesExceptionSender {
-    var failingSenders = Set[Int](0)
-  }
-
   class ExceptionReceiver(environment: Environment)
     extends AbstractInvokable(environment) {
 
@@ -231,24 +212,6 @@ object Tasks {
     }
   }
 
-  class SometimesInstantiationErrorSender(environment: Environment)
-    extends AbstractInvokable(environment) {
-
-    // this only works if the TaskManager runs in the same JVM as the test case
-    
if(SometimesInstantiationErrorSender.failingSenders.contains(this.getIndexInSubtaskGroup)){
-      throw new RuntimeException("Test exception in constructor")
-    }
-
-    override def invoke(): Unit = {
-      val o = new Object()
-      o.synchronized(o.wait())
-    }
-  }
-
-  object SometimesInstantiationErrorSender {
-    var failingSenders = Set[Int](0)
-  }
-
   class BlockingReceiver(environment: Environment)
     extends AbstractInvokable(environment) {
     override def invoke(): Unit = {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index fa8fa34a629..a806f026986 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -38,8 +38,11 @@
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -71,6 +74,7 @@
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -214,6 +218,80 @@ private void restoreJobAndVerifyState(String 
savepointPath, MiniClusterResourceF
                }
        }
 
+       @Test
+       public void testTriggerSavepointForNonExistingJob() throws Exception {
+               // Config
+               final int numTaskManagers = 1;
+               final int numSlotsPerTaskManager = 1;
+
+               final File tmpDir = folder.newFolder();
+               final File savepointDir = new File(tmpDir, "savepoints");
+
+               final Configuration config = new Configuration();
+               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
+
+               final MiniClusterResource cluster = new MiniClusterResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(config)
+                               .setNumberTaskManagers(numTaskManagers)
+                               
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+                               .build());
+               cluster.before();
+               final ClusterClient<?> client = cluster.getClusterClient();
+
+               final JobID jobID = new JobID();
+
+               try {
+                       client.triggerSavepoint(jobID, null).get();
+
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(ExceptionUtils.findThrowable(e, 
FlinkJobNotFoundException.class).isPresent());
+                       assertTrue(ExceptionUtils.findThrowableWithMessage(e, 
jobID.toString()).isPresent());
+               } finally {
+                       cluster.after();
+               }
+       }
+
+       @Test
+       public void testTriggerSavepointWithCheckpointingDisabled() throws 
Exception {
+               // Config
+               final int numTaskManagers = 1;
+               final int numSlotsPerTaskManager = 1;
+
+               final Configuration config = new Configuration();
+
+               final MiniClusterResource cluster = new MiniClusterResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(config)
+                               .setNumberTaskManagers(numTaskManagers)
+                               
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+                               .build());
+               cluster.before();
+               final ClusterClient<?> client = cluster.getClusterClient();
+
+               final JobVertex vertex = new JobVertex("Blocking vertex");
+               vertex.setInvokableClass(BlockingNoOpInvokable.class);
+               vertex.setParallelism(1);
+
+               final JobGraph graph = new JobGraph(vertex);
+
+               try {
+                       client.setDetached(true);
+                       client.submitJob(graph, 
SavepointITCase.class.getClassLoader());
+
+                       client.triggerSavepoint(graph.getJobID(), null).get();
+
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(ExceptionUtils.findThrowable(e, 
IllegalStateException.class).isPresent());
+                       assertTrue(ExceptionUtils.findThrowableWithMessage(e, 
graph.getJobID().toString()).isPresent());
+                       assertTrue(ExceptionUtils.findThrowableWithMessage(e, 
"is not a streaming job").isPresent());
+               } finally {
+                       cluster.after();
+               }
+       }
+
        @Test
        public void testSubmitWithUnknownSavepointPath() throws Exception {
                // Config


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to