This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c78550cb96 [FLINK-27119][tests] Use try-with-resources for JobMasters
0c78550cb96 is described below

commit 0c78550cb9698d64680969987486bac4dfcd9001
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Apr 6 17:09:26 2022 +0200

    [FLINK-27119][tests] Use try-with-resources for JobMasters
---
 ...asterExecutionDeploymentReconciliationTest.java | 171 ++++++------
 .../jobmaster/JobMasterQueryableStateTest.java     | 122 ++++-----
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |  47 ++--
 .../flink/runtime/jobmaster/JobMasterTest.java     | 290 +++++++++------------
 4 files changed, 288 insertions(+), 342 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
index 3abd26ec16d..6c867c37028 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
@@ -103,49 +103,52 @@ public class 
JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
         TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper =
                 new TestingExecutionDeploymentTrackerWrapper();
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
-        JobMaster jobMaster =
-                createAndStartJobMaster(onCompletionActions, 
deploymentTrackerWrapper, jobGraph);
-        JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
-        RPC_SERVICE_RESOURCE
-                .getTestingRpcService()
-                .registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
-
-        final CompletableFuture<ExecutionAttemptID> taskCancellationFuture =
-                new CompletableFuture<>();
-        TaskExecutorGateway taskExecutorGateway = 
createTaskExecutorGateway(taskCancellationFuture);
-        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation =
-                new LocalUnresolvedTaskManagerLocation();
-
-        registerTaskExecutorAndOfferSlots(
-                jobMasterGateway,
-                jobGraph.getJobID(),
-                taskExecutorGateway,
-                localUnresolvedTaskManagerLocation);
-
-        ExecutionAttemptID deployedExecution =
-                deploymentTrackerWrapper.getTaskDeploymentFuture().get();
-        assertFalse(taskCancellationFuture.isDone());
-
-        ExecutionAttemptID unknownDeployment = new ExecutionAttemptID();
-        //  the deployment report is missing the just deployed task, but 
contains the ID of some
-        // other unknown deployment
-        //  the job master should cancel the unknown deployment, and fail the 
job
-        jobMasterGateway.heartbeatFromTaskManager(
-                localUnresolvedTaskManagerLocation.getResourceID(),
-                new TaskExecutorToJobManagerHeartbeatPayload(
-                        new AccumulatorReport(Collections.emptyList()),
-                        new 
ExecutionDeploymentReport(Collections.singleton(unknownDeployment))));
-
-        assertThat(taskCancellationFuture.get(), is(unknownDeployment));
-        assertThat(deploymentTrackerWrapper.getStopFuture().get(), 
is(deployedExecution));
-
-        assertThat(
-                onCompletionActions
-                        .getJobReachedGloballyTerminalStateFuture()
-                        .get()
-                        .getArchivedExecutionGraph()
-                        .getState(),
-                is(JobStatus.FAILED));
+        try (JobMaster jobMaster =
+                createAndStartJobMaster(onCompletionActions, 
deploymentTrackerWrapper, jobGraph)) {
+            JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            RPC_SERVICE_RESOURCE
+                    .getTestingRpcService()
+                    .registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+
+            final CompletableFuture<ExecutionAttemptID> taskCancellationFuture 
=
+                    new CompletableFuture<>();
+            TaskExecutorGateway taskExecutorGateway =
+                    createTaskExecutorGateway(taskCancellationFuture);
+            LocalUnresolvedTaskManagerLocation 
localUnresolvedTaskManagerLocation =
+                    new LocalUnresolvedTaskManagerLocation();
+
+            registerTaskExecutorAndOfferSlots(
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway,
+                    localUnresolvedTaskManagerLocation);
+
+            ExecutionAttemptID deployedExecution =
+                    deploymentTrackerWrapper.getTaskDeploymentFuture().get();
+            assertFalse(taskCancellationFuture.isDone());
+
+            ExecutionAttemptID unknownDeployment = new ExecutionAttemptID();
+            //  the deployment report is missing the just deployed task, but 
contains the ID of some
+            // other unknown deployment
+            //  the job master should cancel the unknown deployment, and fail 
the job
+            jobMasterGateway.heartbeatFromTaskManager(
+                    localUnresolvedTaskManagerLocation.getResourceID(),
+                    new TaskExecutorToJobManagerHeartbeatPayload(
+                            new AccumulatorReport(Collections.emptyList()),
+                            new ExecutionDeploymentReport(
+                                    
Collections.singleton(unknownDeployment))));
+
+            assertThat(taskCancellationFuture.get(), is(unknownDeployment));
+            assertThat(deploymentTrackerWrapper.getStopFuture().get(), 
is(deployedExecution));
+
+            assertThat(
+                    onCompletionActions
+                            .getJobReachedGloballyTerminalStateFuture()
+                            .get()
+                            .getArchivedExecutionGraph()
+                            .getState(),
+                    is(JobStatus.FAILED));
+        }
     }
 
     /**
@@ -157,46 +160,48 @@ public class 
JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
         TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper =
                 new TestingExecutionDeploymentTrackerWrapper();
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
-        JobMaster jobMaster = 
createAndStartJobMaster(deploymentTrackerWrapper, jobGraph);
-        JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
-        RPC_SERVICE_RESOURCE
-                .getTestingRpcService()
-                .registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
-
-        final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
-                new CompletableFuture<>();
-        final CompletableFuture<ExecutionAttemptID> taskCancellationFuture =
-                new CompletableFuture<>();
-        final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture =
-                new CompletableFuture<>();
-        TaskExecutorGateway taskExecutorGateway =
-                createTaskExecutorGateway(
-                        taskCancellationFuture,
-                        taskSubmissionFuture,
-                        taskSubmissionAcknowledgeFuture);
-        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation =
-                new LocalUnresolvedTaskManagerLocation();
-
-        registerTaskExecutorAndOfferSlots(
-                jobMasterGateway,
-                jobGraph.getJobID(),
-                taskExecutorGateway,
-                localUnresolvedTaskManagerLocation);
-
-        ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
-
-        // the execution has not been acknowledged yet by the TaskExecutor, 
but we already allow the
-        // ID to be in the heartbeat payload
-        jobMasterGateway.heartbeatFromTaskManager(
-                localUnresolvedTaskManagerLocation.getResourceID(),
-                new TaskExecutorToJobManagerHeartbeatPayload(
-                        new AccumulatorReport(Collections.emptyList()),
-                        new 
ExecutionDeploymentReport(Collections.singleton(pendingExecutionId))));
-
-        taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
-
-        deploymentTrackerWrapper.getTaskDeploymentFuture().get();
-        assertFalse(taskCancellationFuture.isDone());
+        try (JobMaster jobMaster = 
createAndStartJobMaster(deploymentTrackerWrapper, jobGraph)) {
+            JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            RPC_SERVICE_RESOURCE
+                    .getTestingRpcService()
+                    .registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+
+            final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
+                    new CompletableFuture<>();
+            final CompletableFuture<ExecutionAttemptID> taskCancellationFuture 
=
+                    new CompletableFuture<>();
+            final CompletableFuture<Acknowledge> 
taskSubmissionAcknowledgeFuture =
+                    new CompletableFuture<>();
+            TaskExecutorGateway taskExecutorGateway =
+                    createTaskExecutorGateway(
+                            taskCancellationFuture,
+                            taskSubmissionFuture,
+                            taskSubmissionAcknowledgeFuture);
+            LocalUnresolvedTaskManagerLocation 
localUnresolvedTaskManagerLocation =
+                    new LocalUnresolvedTaskManagerLocation();
+
+            registerTaskExecutorAndOfferSlots(
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway,
+                    localUnresolvedTaskManagerLocation);
+
+            ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
+
+            // the execution has not been acknowledged yet by the 
TaskExecutor, but we already allow
+            // the ID to be in the heartbeat payload
+            jobMasterGateway.heartbeatFromTaskManager(
+                    localUnresolvedTaskManagerLocation.getResourceID(),
+                    new TaskExecutorToJobManagerHeartbeatPayload(
+                            new AccumulatorReport(Collections.emptyList()),
+                            new ExecutionDeploymentReport(
+                                    
Collections.singleton(pendingExecutionId))));
+
+            taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
+
+            deploymentTrackerWrapper.getTaskDeploymentFuture().get();
+            assertFalse(taskCancellationFuture.isDone());
+        }
     }
 
     private JobMaster createAndStartJobMaster(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
index 0d7420d7291..8d829c28186 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -55,6 +55,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -115,85 +116,80 @@ public class JobMasterQueryableStateTest extends 
TestLogger {
 
     @Test
     public void testRequestKvStateWithoutRegistration() throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) 
{
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
-            // lookup location
-            try {
-                jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), 
"unknown").get();
-                fail("Expected to fail with UnknownKvStateLocation");
-            } catch (Exception e) {
-                assertTrue(
-                        ExceptionUtils.findThrowable(e, 
UnknownKvStateLocation.class).isPresent());
-            }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+            assertThatThrownBy(
+                            () ->
+                                    jobMasterGateway
+                                            
.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown")
+                                            .get())
+                    
.satisfies(FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class));
         }
     }
 
     @Test
     public void testRequestKvStateOfWrongJob() throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) 
{
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
-            // lookup location
-            try {
-                jobMasterGateway.requestKvStateLocation(new JobID(), 
"unknown").get();
-                fail("Expected to fail with FlinkJobNotFoundException");
-            } catch (Exception e) {
-                assertThat(e, containsCause(FlinkJobNotFoundException.class));
-            }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+            assertThatThrownBy(
+                            () ->
+                                    jobMasterGateway
+                                            .requestKvStateLocation(new 
JobID(), "unknown")
+                                            .get())
+                    
.satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
         }
     }
 
     @Test
     public void testRequestKvStateWithIrrelevantRegistration() throws 
Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) 
{
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
             // register an irrelevant KvState
-            try {
-                registerKvState(jobMasterGateway, new JobID(), new 
JobVertexID(), "any-name");
-                fail("Expected to fail with FlinkJobNotFoundException.");
-            } catch (Exception e) {
-                assertThat(e, containsCause(FlinkJobNotFoundException.class));
-            }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+            assertThatThrownBy(
+                            () ->
+                                    registerKvState(
+                                            jobMasterGateway,
+                                            new JobID(),
+                                            new JobVertexID(),
+                                            "any-name"))
+                    
.satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
         }
     }
 
     @Test
     public void testRegisterKvState() throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
-
-        jobMaster.start();
+        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster()) {
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
             final String registrationName = "register-me";
             final KvStateID kvStateID = new KvStateID();
             final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
@@ -223,22 +219,21 @@ public class JobMasterQueryableStateTest extends 
TestLogger {
             assertEquals(kvStateID, 
location.getKvStateID(keyGroupRange.getStartKeyGroup()));
             assertEquals(
                     address, 
location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     @Test
     public void testUnregisterKvState() throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) 
{
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
             final String registrationName = "register-me";
             final KvStateID kvStateID = new KvStateID();
             final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
@@ -271,22 +266,21 @@ public class JobMasterQueryableStateTest extends 
TestLogger {
             } catch (Exception e) {
                 assertThat(e, containsCause(UnknownKvStateLocation.class));
             }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     @Test
     public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) 
{
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
+            registerSlotsRequiredForJobExecution(jobMasterGateway, 
JOB_GRAPH.getJobID());
 
-        try {
             // duplicate registration fails task
 
             final String registrationName = "duplicate-me";
@@ -309,8 +303,6 @@ public class JobMasterQueryableStateTest extends TestLogger 
{
                         
jobMasterGateway.requestJobStatus(testingTimeout).get(),
                         
either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
             }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index 9dc17dff3dd..07fc6ed2e3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -65,29 +65,36 @@ public class JobMasterSchedulerTest extends TestLogger {
         final SchedulerNGFactory schedulerFactory = new 
FailingSchedulerFactory();
         final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
                 new JobMasterBuilder.TestingOnCompletionActions();
-        final JobMaster jobMaster =
-                new JobMasterBuilder(
-                                JobGraphTestUtils.emptyJobGraph(),
-                                
TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
-                        .withSlotPoolServiceSchedulerFactory(
-                                DefaultSlotPoolServiceSchedulerFactory.create(
-                                        
TestingSlotPoolServiceBuilder.newBuilder(),
-                                        schedulerFactory))
-                        .withOnCompletionActions(onCompletionActions)
-                        .createJobMaster();
+        final JobManagerSharedServices jobManagerSharedServices =
+                new TestingJobManagerSharedServicesBuilder().build();
+        try {
+            final JobMaster jobMaster =
+                    new JobMasterBuilder(
+                                    JobGraphTestUtils.emptyJobGraph(),
+                                    
TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
+                            .withSlotPoolServiceSchedulerFactory(
+                                    
DefaultSlotPoolServiceSchedulerFactory.create(
+                                            
TestingSlotPoolServiceBuilder.newBuilder(),
+                                            schedulerFactory))
+                            .withOnCompletionActions(onCompletionActions)
+                            
.withJobManagerSharedServices(jobManagerSharedServices)
+                            .createJobMaster();
 
-        jobMaster.start();
+            jobMaster.start();
 
-        assertThat(
-                onCompletionActions.getJobMasterFailedFuture().join(),
-                is(instanceOf(JobMasterException.class)));
+            assertThat(
+                    onCompletionActions.getJobMasterFailedFuture().join(),
+                    is(instanceOf(JobMasterException.class)));
 
-        // close the jobMaster to remove it from the testing rpc service so 
that it can shut down
-        // cleanly
-        try {
-            jobMaster.close();
-        } catch (Exception expected) {
-            // expected
+            // close the jobMaster to remove it from the testing rpc service 
so that it can shut
+            // down cleanly
+            try {
+                jobMaster.close();
+            } catch (Exception expected) {
+                // expected
+            }
+        } finally {
+            jobManagerSharedServices.shutdown();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f61e6690f48..1ac904e49b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -265,17 +265,16 @@ public class JobMasterTest extends TestLogger {
 
         rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withResourceId(jmResourceId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(new HeartbeatServices(1L, 
10000L))
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             final JobMasterGateway jobMasterGateway =
                     jobMaster.getSelfGateway(JobMasterGateway.class);
 
@@ -294,8 +293,6 @@ public class JobMasterTest extends TestLogger {
             registrationResponse.get();
 
             assertThat(heartbeatResourceIdFuture.join(), anyOf(nullValue(), 
equalTo(jmResourceId)));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -323,17 +320,16 @@ public class JobMasterTest extends TestLogger {
 
         rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withResourceId(jmResourceId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             final JobMasterGateway jobMasterGateway =
                     jobMaster.getSelfGateway(JobMasterGateway.class);
 
@@ -356,8 +352,6 @@ public class JobMasterTest extends TestLogger {
                             testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
             assertThat(disconnectedJobManager, equalTo(jobGraph.getJobID()));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -416,22 +410,18 @@ public class JobMasterTest extends TestLogger {
 
         rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
 
-        final JobManagerSharedServices jobManagerSharedServices =
-                new TestingJobManagerSharedServicesBuilder().build();
-
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withHeartbeatServices(new HeartbeatServices(5L, 
1000L))
                         .withSlotPoolServiceSchedulerFactory(
                                 DefaultSlotPoolServiceSchedulerFactory.create(
                                         new 
TestingSlotPoolFactory(hasReceivedSlotOffers),
                                         new DefaultSchedulerFactory()))
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             final JobMasterGateway jobMasterGateway =
                     jobMaster.getSelfGateway(JobMasterGateway.class);
 
@@ -463,9 +453,6 @@ public class JobMasterTest extends TestLogger {
 
             // make sure that no assertion has been violated
             assertionFuture.get();
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
-            jobManagerSharedServices.shutdown();
         }
     }
 
@@ -677,18 +664,17 @@ public class JobMasterTest extends TestLogger {
 
         rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
                         .withResourceId(jmResourceId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(fastHeartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             // define a leader and see that a registration happens
             rmLeaderRetrievalService.notifyListener(
                     resourceManagerAddress, resourceManagerId.toUUID());
@@ -711,8 +697,6 @@ public class JobMasterTest extends TestLogger {
 
             // the JobMaster should try to reconnect to the RM
             registrationAttempts.await();
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -752,18 +736,17 @@ public class JobMasterTest extends TestLogger {
 
         rpcService.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
                         .withResourceId(jmResourceId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             // define a leader and see that a registration happens
             rmLeaderRetrievalService.notifyListener(
                     resourceManagerAddress, resourceManagerId.toUUID());
@@ -783,8 +766,6 @@ public class JobMasterTest extends TestLogger {
 
             // the JobMaster should try to reconnect to the RM
             registrationAttempts.await();
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -811,12 +792,11 @@ public class JobMasterTest extends TestLogger {
                         maxCheckpoints -> completedCheckpointStore);
         
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withHighAvailabilityServices(haServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             // we need to start and register the required slots to let the 
adaptive scheduler
             // restore from the savepoint
             jobMaster.start();
@@ -845,8 +825,6 @@ public class JobMasterTest extends TestLogger {
             assertThat(savepointCheckpoint, Matchers.notNullValue());
 
             assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -887,9 +865,9 @@ public class JobMasterTest extends TestLogger {
                         maxCheckpoints -> completedCheckpointStore);
         
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
-        final JobMaster jobMaster = new JobMasterBuilder(jobGraph, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
-        try {
             // starting the JobMaster should have read the savepoint
             final CompletedCheckpoint savepointCheckpoint =
                     completedCheckpointStore.getLatestCheckpoint();
@@ -897,21 +875,18 @@ public class JobMasterTest extends TestLogger {
             assertThat(savepointCheckpoint, Matchers.notNullValue());
 
             assertThat(savepointCheckpoint.getCheckpointID(), 
is(checkpointId));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     /** Tests that we can close an unestablished ResourceManager connection. */
     @Test
     public void testCloseUnestablishedResourceManagerConnection() throws 
Exception {
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             jobMaster.start();
 
             final TestingResourceManagerGateway firstResourceManagerGateway =
@@ -946,27 +921,25 @@ public class JobMasterTest extends TestLogger {
 
             // check that we start registering at the second RM
             secondJobManagerRegistration.await();
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     /** Tests that we continue reconnecting to the latest known RM after a 
disconnection message. */
     @Test
     public void testReconnectionAfterDisconnect() throws Exception {
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        try {
             final TestingResourceManagerGateway testingResourceManagerGateway =
                     createAndRegisterTestingResourceManagerGateway();
             final BlockingQueue<JobMasterId> registrationsQueue = new 
ArrayBlockingQueue<>(1);
@@ -993,23 +966,20 @@ public class JobMasterTest extends TestLogger {
 
             // wait for the second registration attempt after the disconnect 
call
             assertThat(registrationsQueue.take(), equalTo(jobMasterId));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     /** Tests that the a JM connects to the leading RM after regaining 
leadership. */
     @Test
     public void testResourceManagerConnectionAfterStart() throws Exception {
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             final TestingResourceManagerGateway testingResourceManagerGateway =
                     createAndRegisterTestingResourceManagerGateway();
 
@@ -1028,8 +998,6 @@ public class JobMasterTest extends TestLogger {
             final JobMasterId firstRegistrationAttempt = 
registrationQueue.take();
 
             assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1094,16 +1062,15 @@ public class JobMasterTest extends TestLogger {
                         .setExecutionConfig(executionConfig)
                         .build();
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(inputSplitJobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             final JobMasterGateway jobMasterGateway =
                     jobMaster.getSelfGateway(JobMasterGateway.class);
 
@@ -1154,8 +1121,6 @@ public class JobMasterTest extends TestLogger {
                             expectedRemainingInputSplits
                                     .apply(inputSplitsPerTask)
                                     .toArray(EMPTY_TESTING_INPUT_SPLITS)));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1349,16 +1314,15 @@ public class JobMasterTest extends TestLogger {
     @Test
     public void testRequestPartitionState() throws Exception {
         final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(producerConsumerJobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        try {
             final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new 
CompletableFuture<>();
             final TestingTaskExecutorGateway testingTaskExecutorGateway =
                     new TestingTaskExecutorGatewayBuilder()
@@ -1453,8 +1417,6 @@ public class JobMasterTest extends TestLogger {
                                 .isPresent(),
                         is(true));
             }
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1477,16 +1439,15 @@ public class JobMasterTest extends TestLogger {
                                 (ignoredA, ignoredB, formatType) -> new 
CompletableFuture<>())
                         .build();
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withFatalErrorHandler(testingFatalErrorHandler)
                         .withSlotPoolServiceSchedulerFactory(
                                 DefaultSlotPoolServiceSchedulerFactory.create(
                                         
TestingSlotPoolServiceBuilder.newBuilder(),
                                         new 
TestingSchedulerNGFactory(testingSchedulerNG)))
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             jobMaster.start();
 
             final JobMasterGateway jobMasterGateway =
@@ -1507,8 +1468,6 @@ public class JobMasterTest extends TestLogger {
             }
 
             assertThat(savepointFutureHighTimeout.isDone(), 
is(equalTo(false)));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1518,29 +1477,29 @@ public class JobMasterTest extends TestLogger {
 
         final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy();
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        final CompletableFuture<JobID> disconnectTaskExecutorFuture = new 
CompletableFuture<>();
-        final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
-        final TestingTaskExecutorGateway testingTaskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setFreeSlotFunction(
-                                (allocationID, throwable) -> {
-                                    freedSlotFuture.complete(allocationID);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                })
-                        .setDisconnectJobManagerConsumer(
-                                (jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
-                        .createTestingTaskExecutorGateway();
-        final LocalUnresolvedTaskManagerLocation taskManagerLocation =
-                new LocalUnresolvedTaskManagerLocation();
+            final CompletableFuture<JobID> disconnectTaskExecutorFuture = new 
CompletableFuture<>();
+            final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
+            final TestingTaskExecutorGateway testingTaskExecutorGateway =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setFreeSlotFunction(
+                                    (allocationID, throwable) -> {
+                                        freedSlotFuture.complete(allocationID);
+                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                    })
+                            .setDisconnectJobManagerConsumer(
+                                    (jobID, throwable) ->
+                                            
disconnectTaskExecutorFuture.complete(jobID))
+                            .createTestingTaskExecutorGateway();
+            final LocalUnresolvedTaskManagerLocation taskManagerLocation =
+                    new LocalUnresolvedTaskManagerLocation();
 
-        try {
             jobMaster.start();
 
             final JobMasterGateway jobMasterGateway =
@@ -1568,8 +1527,6 @@ public class JobMasterTest extends TestLogger {
             // longer slots from it
             assertThat(freedSlotFuture.get(), equalTo(allocationId));
             assertThat(disconnectTaskExecutorFuture.get(), 
equalTo(jobGraph.getJobID()));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1589,29 +1546,29 @@ public class JobMasterTest extends TestLogger {
                 new TestingJobMasterPartitionTracker();
         partitionTracker.setIsTrackingPartitionsForFunction(ignored -> 
isTrackingPartitions.get());
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withJobManagerSharedServices(jobManagerSharedServices)
                         .withHeartbeatServices(heartbeatServices)
                         .withPartitionTrackerFactory(ignored -> 
partitionTracker)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        final CompletableFuture<JobID> disconnectTaskExecutorFuture = new 
CompletableFuture<>();
-        final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
-        final TestingTaskExecutorGateway testingTaskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setFreeSlotFunction(
-                                (allocationID, throwable) -> {
-                                    freedSlotFuture.complete(allocationID);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                })
-                        .setDisconnectJobManagerConsumer(
-                                (jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
-                        .createTestingTaskExecutorGateway();
+            final CompletableFuture<JobID> disconnectTaskExecutorFuture = new 
CompletableFuture<>();
+            final CompletableFuture<AllocationID> freedSlotFuture = new 
CompletableFuture<>();
+            final TestingTaskExecutorGateway testingTaskExecutorGateway =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setFreeSlotFunction(
+                                    (allocationID, throwable) -> {
+                                        freedSlotFuture.complete(allocationID);
+                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                    })
+                            .setDisconnectJobManagerConsumer(
+                                    (jobID, throwable) ->
+                                            
disconnectTaskExecutorFuture.complete(jobID))
+                            .createTestingTaskExecutorGateway();
 
-        try {
             jobMaster.start();
 
             final JobMasterGateway jobMasterGateway =
@@ -1642,25 +1599,23 @@ public class JobMasterTest extends TestLogger {
             // complete
             jobMasterGateway.requestJobStatus(Time.seconds(5)).get();
             assertThat(disconnectTaskExecutorFuture.isDone(), is(false));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     /** Tests the updateGlobalAggregate functionality. */
     @Test
     public void testJobMasterAggregatesValuesCorrectly() throws Exception {
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
-        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+            jobMaster.start();
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
 
-        try {
             CompletableFuture<Object> updateAggregateFuture;
 
             AggregateFunction<Integer, Integer, Integer> aggregateFunction =
@@ -1694,9 +1649,6 @@ public class JobMasterTest extends TestLogger {
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg2", 23, 
serializedAggregateFunction);
             assertThat(updateAggregateFuture.get(), equalTo(33));
-
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1765,9 +1717,9 @@ public class JobMasterTest extends TestLogger {
      */
     @Test
     public void 
testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws 
Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(jobGraph, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
-        try {
             jobMaster.start();
 
             final CompletableFuture<RegistrationResponse> registrationResponse 
=
@@ -1780,21 +1732,19 @@ public class JobMasterTest extends TestLogger {
                             testingTimeout);
 
             assertThat(registrationResponse.get(), 
instanceOf(JMTMRegistrationRejection.class));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     @Test
     public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() 
throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(jobGraph, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
-        final TestingTaskExecutorGateway testingTaskExecutorGateway =
-                new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-        rpcService.registerGateway(
-                testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
+            final TestingTaskExecutorGateway testingTaskExecutorGateway =
+                    new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+            rpcService.registerGateway(
+                    testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
 
-        try {
             jobMaster.start();
 
             final TaskManagerRegistrationInformation 
taskManagerRegistrationInformation =
@@ -1816,34 +1766,33 @@ public class JobMasterTest extends TestLogger {
 
             assertThat(firstRegistrationResponse.get(), 
instanceOf(JMTMRegistrationSuccess.class));
             assertThat(secondRegistrationResponse.get(), 
instanceOf(JMTMRegistrationSuccess.class));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
     @Test
     public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() 
throws Exception {
-        final JobMaster jobMaster = new JobMasterBuilder(jobGraph, 
rpcService).createJobMaster();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
-        final CompletableFuture<Void> firstTaskExecutorDisconnectedFuture =
-                new CompletableFuture<>();
-        final TestingTaskExecutorGateway firstTaskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setAddress("firstTaskExecutor")
-                        .setDisconnectJobManagerConsumer(
-                                (jobID, throwable) ->
-                                        
firstTaskExecutorDisconnectedFuture.complete(null))
-                        .createTestingTaskExecutorGateway();
-        final TestingTaskExecutorGateway secondTaskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder()
-                        .setAddress("secondTaskExecutor")
-                        .createTestingTaskExecutorGateway();
+            final CompletableFuture<Void> firstTaskExecutorDisconnectedFuture =
+                    new CompletableFuture<>();
+            final TestingTaskExecutorGateway firstTaskExecutorGateway =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setAddress("firstTaskExecutor")
+                            .setDisconnectJobManagerConsumer(
+                                    (jobID, throwable) ->
+                                            
firstTaskExecutorDisconnectedFuture.complete(null))
+                            .createTestingTaskExecutorGateway();
+            final TestingTaskExecutorGateway secondTaskExecutorGateway =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setAddress("secondTaskExecutor")
+                            .createTestingTaskExecutorGateway();
 
-        rpcService.registerGateway(firstTaskExecutorGateway.getAddress(), 
firstTaskExecutorGateway);
-        rpcService.registerGateway(
-                secondTaskExecutorGateway.getAddress(), 
secondTaskExecutorGateway);
+            rpcService.registerGateway(
+                    firstTaskExecutorGateway.getAddress(), 
firstTaskExecutorGateway);
+            rpcService.registerGateway(
+                    secondTaskExecutorGateway.getAddress(), 
secondTaskExecutorGateway);
 
-        try {
             jobMaster.start();
 
             final LocalUnresolvedTaskManagerLocation taskManagerLocation =
@@ -1873,8 +1822,6 @@ public class JobMasterTest extends TestLogger {
             assertThat(secondRegistrationResponse.get(), 
instanceOf(JMTMRegistrationSuccess.class));
             // the first TaskExecutor should be disconnected
             firstTaskExecutorDisconnectedFuture.get();
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1886,27 +1833,28 @@ public class JobMasterTest extends TestLogger {
                         .setCloseAsyncSupplier(() -> 
schedulerTerminationFuture)
                         .build();
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withSlotPoolServiceSchedulerFactory(
                                 DefaultSlotPoolServiceSchedulerFactory.create(
                                         
TestingSlotPoolServiceBuilder.newBuilder(),
                                         new 
TestingSchedulerNGFactory(testingSchedulerNG)))
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        jobMaster.start();
+            jobMaster.start();
 
-        final CompletableFuture<Void> jobMasterTerminationFuture = 
jobMaster.closeAsync();
+            final CompletableFuture<Void> jobMasterTerminationFuture = 
jobMaster.closeAsync();
 
-        try {
-            jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
-            fail("Expected TimeoutException because the JobMaster should not 
terminate.");
-        } catch (TimeoutException expected) {
-        }
+            try {
+                jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
+                fail("Expected TimeoutException because the JobMaster should 
not terminate.");
+            } catch (TimeoutException expected) {
+            }
 
-        schedulerTerminationFuture.complete(null);
+            schedulerTerminationFuture.complete(null);
 
-        jobMasterTerminationFuture.get();
+            jobMasterTerminationFuture.get();
+        }
     }
 
     @Test
@@ -1915,12 +1863,11 @@ public class JobMasterTest extends TestLogger {
         configuration.set(
                 RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ofDays(1));
         final int numberSlots = 1;
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             jobMaster.start();
 
             final JobMasterGateway jobMasterGateway =
@@ -1961,8 +1908,6 @@ public class JobMasterTest extends TestLogger {
                                     .createTestingTaskExecutorGateway(),
                             new LocalUnresolvedTaskManagerLocation()),
                     hasSize(numberSlots));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 
@@ -1974,15 +1919,14 @@ public class JobMasterTest extends TestLogger {
         final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
                 new JobMasterBuilder.TestingOnCompletionActions();
 
-        final JobMaster jobMaster =
+        try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withResourceId(jmResourceId)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
                         .withOnCompletionActions(onCompletionActions)
-                        .createJobMaster();
+                        .createJobMaster()) {
 
-        try {
             jobMaster.start();
 
             final JobMasterGateway jobMasterGateway =
@@ -2031,8 +1975,6 @@ public class JobMasterTest extends TestLogger {
                             .getArchivedExecutionGraph();
 
             assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
-        } finally {
-            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
         }
     }
 

Reply via email to