This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0c78550cb96 [FLINK-27119][tests] Use try-with-resources for JobMasters 0c78550cb96 is described below commit 0c78550cb9698d64680969987486bac4dfcd9001 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Wed Apr 6 17:09:26 2022 +0200 [FLINK-27119][tests] Use try-with-resources for JobMasters --- ...asterExecutionDeploymentReconciliationTest.java | 171 ++++++------ .../jobmaster/JobMasterQueryableStateTest.java | 122 ++++----- .../runtime/jobmaster/JobMasterSchedulerTest.java | 47 ++-- .../flink/runtime/jobmaster/JobMasterTest.java | 290 +++++++++------------ 4 files changed, 288 insertions(+), 342 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index 3abd26ec16d..6c867c37028 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -103,49 +103,52 @@ public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger { TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper(); final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); - JobMaster jobMaster = - createAndStartJobMaster(onCompletionActions, deploymentTrackerWrapper, jobGraph); - JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); - RPC_SERVICE_RESOURCE - .getTestingRpcService() - .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); - - final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = - new CompletableFuture<>(); - TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture); - LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = - new LocalUnresolvedTaskManagerLocation(); - - registerTaskExecutorAndOfferSlots( - jobMasterGateway, - jobGraph.getJobID(), - taskExecutorGateway, - localUnresolvedTaskManagerLocation); - - ExecutionAttemptID deployedExecution = - deploymentTrackerWrapper.getTaskDeploymentFuture().get(); - assertFalse(taskCancellationFuture.isDone()); - - ExecutionAttemptID unknownDeployment = new ExecutionAttemptID(); - // the deployment report is missing the just deployed task, but contains the ID of some - // other unknown deployment - // the job master should cancel the unknown deployment, and fail the job - jobMasterGateway.heartbeatFromTaskManager( - localUnresolvedTaskManagerLocation.getResourceID(), - new TaskExecutorToJobManagerHeartbeatPayload( - new AccumulatorReport(Collections.emptyList()), - new ExecutionDeploymentReport(Collections.singleton(unknownDeployment)))); - - assertThat(taskCancellationFuture.get(), is(unknownDeployment)); - assertThat(deploymentTrackerWrapper.getStopFuture().get(), is(deployedExecution)); - - assertThat( - onCompletionActions - .getJobReachedGloballyTerminalStateFuture() - .get() - .getArchivedExecutionGraph() - .getState(), - is(JobStatus.FAILED)); + try (JobMaster jobMaster = + createAndStartJobMaster(onCompletionActions, deploymentTrackerWrapper, jobGraph)) { + JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + RPC_SERVICE_RESOURCE + .getTestingRpcService() + .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = + new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = + createTaskExecutorGateway(taskCancellationFuture); + LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); + + registerTaskExecutorAndOfferSlots( + jobMasterGateway, + jobGraph.getJobID(), + taskExecutorGateway, + localUnresolvedTaskManagerLocation); + + ExecutionAttemptID deployedExecution = + deploymentTrackerWrapper.getTaskDeploymentFuture().get(); + assertFalse(taskCancellationFuture.isDone()); + + ExecutionAttemptID unknownDeployment = new ExecutionAttemptID(); + // the deployment report is missing the just deployed task, but contains the ID of some + // other unknown deployment + // the job master should cancel the unknown deployment, and fail the job + jobMasterGateway.heartbeatFromTaskManager( + localUnresolvedTaskManagerLocation.getResourceID(), + new TaskExecutorToJobManagerHeartbeatPayload( + new AccumulatorReport(Collections.emptyList()), + new ExecutionDeploymentReport( + Collections.singleton(unknownDeployment)))); + + assertThat(taskCancellationFuture.get(), is(unknownDeployment)); + assertThat(deploymentTrackerWrapper.getStopFuture().get(), is(deployedExecution)); + + assertThat( + onCompletionActions + .getJobReachedGloballyTerminalStateFuture() + .get() + .getArchivedExecutionGraph() + .getState(), + is(JobStatus.FAILED)); + } } /** @@ -157,46 +160,48 @@ public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger { TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper(); final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); - JobMaster jobMaster = createAndStartJobMaster(deploymentTrackerWrapper, jobGraph); - JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); - RPC_SERVICE_RESOURCE - .getTestingRpcService() - .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); - - final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture = - new CompletableFuture<>(); - final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = - new CompletableFuture<>(); - final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture = - new CompletableFuture<>(); - TaskExecutorGateway taskExecutorGateway = - createTaskExecutorGateway( - taskCancellationFuture, - taskSubmissionFuture, - taskSubmissionAcknowledgeFuture); - LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = - new LocalUnresolvedTaskManagerLocation(); - - registerTaskExecutorAndOfferSlots( - jobMasterGateway, - jobGraph.getJobID(), - taskExecutorGateway, - localUnresolvedTaskManagerLocation); - - ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get(); - - // the execution has not been acknowledged yet by the TaskExecutor, but we already allow the - // ID to be in the heartbeat payload - jobMasterGateway.heartbeatFromTaskManager( - localUnresolvedTaskManagerLocation.getResourceID(), - new TaskExecutorToJobManagerHeartbeatPayload( - new AccumulatorReport(Collections.emptyList()), - new ExecutionDeploymentReport(Collections.singleton(pendingExecutionId)))); - - taskSubmissionAcknowledgeFuture.complete(Acknowledge.get()); - - deploymentTrackerWrapper.getTaskDeploymentFuture().get(); - assertFalse(taskCancellationFuture.isDone()); + try (JobMaster jobMaster = createAndStartJobMaster(deploymentTrackerWrapper, jobGraph)) { + JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + RPC_SERVICE_RESOURCE + .getTestingRpcService() + .registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture = + new CompletableFuture<>(); + final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = + new CompletableFuture<>(); + final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture = + new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = + createTaskExecutorGateway( + taskCancellationFuture, + taskSubmissionFuture, + taskSubmissionAcknowledgeFuture); + LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); + + registerTaskExecutorAndOfferSlots( + jobMasterGateway, + jobGraph.getJobID(), + taskExecutorGateway, + localUnresolvedTaskManagerLocation); + + ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get(); + + // the execution has not been acknowledged yet by the TaskExecutor, but we already allow + // the ID to be in the heartbeat payload + jobMasterGateway.heartbeatFromTaskManager( + localUnresolvedTaskManagerLocation.getResourceID(), + new TaskExecutorToJobManagerHeartbeatPayload( + new AccumulatorReport(Collections.emptyList()), + new ExecutionDeploymentReport( + Collections.singleton(pendingExecutionId)))); + + taskSubmissionAcknowledgeFuture.complete(Acknowledge.get()); + + deploymentTrackerWrapper.getTaskDeploymentFuture().get(); + assertFalse(taskCancellationFuture.isDone()); + } } private JobMaster createAndStartJobMaster( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java index 0d7420d7291..8d829c28186 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; -import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -55,6 +55,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; @@ -115,85 +116,80 @@ public class JobMasterQueryableStateTest extends TestLogger { @Test public void testRequestKvStateWithoutRegistration() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { - // lookup location - try { - jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown").get(); - fail("Expected to fail with UnknownKvStateLocation"); - } catch (Exception e) { - assertTrue( - ExceptionUtils.findThrowable(e, UnknownKvStateLocation.class).isPresent()); - } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + assertThatThrownBy( + () -> + jobMasterGateway + .requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown") + .get()) + .satisfies(FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class)); } } @Test public void testRequestKvStateOfWrongJob() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { - // lookup location - try { - jobMasterGateway.requestKvStateLocation(new JobID(), "unknown").get(); - fail("Expected to fail with FlinkJobNotFoundException"); - } catch (Exception e) { - assertThat(e, containsCause(FlinkJobNotFoundException.class)); - } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + assertThatThrownBy( + () -> + jobMasterGateway + .requestKvStateLocation(new JobID(), "unknown") + .get()) + .satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)); } } @Test public void testRequestKvStateWithIrrelevantRegistration() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { // register an irrelevant KvState - try { - registerKvState(jobMasterGateway, new JobID(), new JobVertexID(), "any-name"); - fail("Expected to fail with FlinkJobNotFoundException."); - } catch (Exception e) { - assertThat(e, containsCause(FlinkJobNotFoundException.class)); - } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + assertThatThrownBy( + () -> + registerKvState( + jobMasterGateway, + new JobID(), + new JobVertexID(), + "any-name")) + .satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)); } } @Test public void testRegisterKvState() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); - - jobMaster.start(); + try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { final String registrationName = "register-me"; final KvStateID kvStateID = new KvStateID(); final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0); @@ -223,22 +219,21 @@ public class JobMasterQueryableStateTest extends TestLogger { assertEquals(kvStateID, location.getKvStateID(keyGroupRange.getStartKeyGroup())); assertEquals( address, location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup())); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @Test public void testUnregisterKvState() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { final String registrationName = "register-me"; final KvStateID kvStateID = new KvStateID(); final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0); @@ -271,22 +266,21 @@ public class JobMasterQueryableStateTest extends TestLogger { } catch (Exception e) { assertThat(e, containsCause(UnknownKvStateLocation.class)); } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @Test public void testDuplicatedKvStateRegistrationsFailTask() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); + registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID()); - try { // duplicate registration fails task final String registrationName = "duplicate-me"; @@ -309,8 +303,6 @@ public class JobMasterQueryableStateTest extends TestLogger { jobMasterGateway.requestJobStatus(testingTimeout).get(), either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING))); } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java index 9dc17dff3dd..07fc6ed2e3c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java @@ -65,29 +65,36 @@ public class JobMasterSchedulerTest extends TestLogger { final SchedulerNGFactory schedulerFactory = new FailingSchedulerFactory(); final JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); - final JobMaster jobMaster = - new JobMasterBuilder( - JobGraphTestUtils.emptyJobGraph(), - TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService()) - .withSlotPoolServiceSchedulerFactory( - DefaultSlotPoolServiceSchedulerFactory.create( - TestingSlotPoolServiceBuilder.newBuilder(), - schedulerFactory)) - .withOnCompletionActions(onCompletionActions) - .createJobMaster(); + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder().build(); + try { + final JobMaster jobMaster = + new JobMasterBuilder( + JobGraphTestUtils.emptyJobGraph(), + TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService()) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder(), + schedulerFactory)) + .withOnCompletionActions(onCompletionActions) + .withJobManagerSharedServices(jobManagerSharedServices) + .createJobMaster(); - jobMaster.start(); + jobMaster.start(); - assertThat( - onCompletionActions.getJobMasterFailedFuture().join(), - is(instanceOf(JobMasterException.class))); + assertThat( + onCompletionActions.getJobMasterFailedFuture().join(), + is(instanceOf(JobMasterException.class))); - // close the jobMaster to remove it from the testing rpc service so that it can shut down - // cleanly - try { - jobMaster.close(); - } catch (Exception expected) { - // expected + // close the jobMaster to remove it from the testing rpc service so that it can shut + // down cleanly + try { + jobMaster.close(); + } catch (Exception expected) { + // expected + } + } finally { + jobManagerSharedServices.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f61e6690f48..1ac904e49b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -265,17 +265,16 @@ public class JobMasterTest extends TestLogger { rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withResourceId(jmResourceId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(new HeartbeatServices(1L, 10000L)) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); @@ -294,8 +293,6 @@ public class JobMasterTest extends TestLogger { registrationResponse.get(); assertThat(heartbeatResourceIdFuture.join(), anyOf(nullValue(), equalTo(jmResourceId))); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -323,17 +320,16 @@ public class JobMasterTest extends TestLogger { rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withResourceId(jmResourceId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); @@ -356,8 +352,6 @@ public class JobMasterTest extends TestLogger { testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertThat(disconnectedJobManager, equalTo(jobGraph.getJobID())); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -416,22 +410,18 @@ public class JobMasterTest extends TestLogger { rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); - final JobManagerSharedServices jobManagerSharedServices = - new TestingJobManagerSharedServicesBuilder().build(); - final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withHeartbeatServices(new HeartbeatServices(5L, 1000L)) .withSlotPoolServiceSchedulerFactory( DefaultSlotPoolServiceSchedulerFactory.create( new TestingSlotPoolFactory(hasReceivedSlotOffers), new DefaultSchedulerFactory())) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); @@ -463,9 +453,6 @@ public class JobMasterTest extends TestLogger { // make sure that no assertion has been violated assertionFuture.get(); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); - jobManagerSharedServices.shutdown(); } } @@ -677,18 +664,17 @@ public class JobMasterTest extends TestLogger { rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) .withResourceId(jmResourceId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(fastHeartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { // define a leader and see that a registration happens rmLeaderRetrievalService.notifyListener( resourceManagerAddress, resourceManagerId.toUUID()); @@ -711,8 +697,6 @@ public class JobMasterTest extends TestLogger { // the JobMaster should try to reconnect to the RM registrationAttempts.await(); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -752,18 +736,17 @@ public class JobMasterTest extends TestLogger { rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) .withResourceId(jmResourceId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { // define a leader and see that a registration happens rmLeaderRetrievalService.notifyListener( resourceManagerAddress, resourceManagerId.toUUID()); @@ -783,8 +766,6 @@ public class JobMasterTest extends TestLogger { // the JobMaster should try to reconnect to the RM registrationAttempts.await(); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -811,12 +792,11 @@ public class JobMasterTest extends TestLogger { maxCheckpoints -> completedCheckpointStore); haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withHighAvailabilityServices(haServices) - .createJobMaster(); + .createJobMaster()) { - try { // we need to start and register the required slots to let the adaptive scheduler // restore from the savepoint jobMaster.start(); @@ -845,8 +825,6 @@ public class JobMasterTest extends TestLogger { assertThat(savepointCheckpoint, Matchers.notNullValue()); assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -887,9 +865,9 @@ public class JobMasterTest extends TestLogger { maxCheckpoints -> completedCheckpointStore); haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); - final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { - try { // starting the JobMaster should have read the savepoint final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); @@ -897,21 +875,18 @@ public class JobMasterTest extends TestLogger { assertThat(savepointCheckpoint, Matchers.notNullValue()); assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } /** Tests that we can close an unestablished ResourceManager connection. */ @Test public void testCloseUnestablishedResourceManagerConnection() throws Exception { - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) - .createJobMaster(); + .createJobMaster()) { - try { jobMaster.start(); final TestingResourceManagerGateway firstResourceManagerGateway = @@ -946,27 +921,25 @@ public class JobMasterTest extends TestLogger { // check that we start registering at the second RM secondJobManagerRegistration.await(); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } /** Tests that we continue reconnecting to the latest known RM after a disconnection message. */ @Test public void testReconnectionAfterDisconnect() throws Exception { - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - try { final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final BlockingQueue<JobMasterId> registrationsQueue = new ArrayBlockingQueue<>(1); @@ -993,23 +966,20 @@ public class JobMasterTest extends TestLogger { // wait for the second registration attempt after the disconnect call assertThat(registrationsQueue.take(), equalTo(jobMasterId)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } /** Tests that the a JM connects to the leading RM after regaining leadership. */ @Test public void testResourceManagerConnectionAfterStart() throws Exception { - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - try { final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); @@ -1028,8 +998,6 @@ public class JobMasterTest extends TestLogger { final JobMasterId firstRegistrationAttempt = registrationQueue.take(); assertThat(firstRegistrationAttempt, equalTo(jobMasterId)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1094,16 +1062,15 @@ public class JobMasterTest extends TestLogger { .setExecutionConfig(executionConfig) .build(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(inputSplitJobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); @@ -1154,8 +1121,6 @@ public class JobMasterTest extends TestLogger { expectedRemainingInputSplits .apply(inputSplitsPerTask) .toArray(EMPTY_TESTING_INPUT_SPLITS))); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1349,16 +1314,15 @@ public class JobMasterTest extends TestLogger { @Test public void testRequestPartitionState() throws Exception { final JobGraph producerConsumerJobGraph = producerConsumerJobGraph(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(producerConsumerJobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - try { final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>(); final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() @@ -1453,8 +1417,6 @@ public class JobMasterTest extends TestLogger { .isPresent(), is(true)); } - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1477,16 +1439,15 @@ public class JobMasterTest extends TestLogger { (ignoredA, ignoredB, formatType) -> new CompletableFuture<>()) .build(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withFatalErrorHandler(testingFatalErrorHandler) .withSlotPoolServiceSchedulerFactory( DefaultSlotPoolServiceSchedulerFactory.create( TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(testingSchedulerNG))) - .createJobMaster(); + .createJobMaster()) { - try { jobMaster.start(); final JobMasterGateway jobMasterGateway = @@ -1507,8 +1468,6 @@ public class JobMasterTest extends TestLogger { } assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false))); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1518,29 +1477,29 @@ public class JobMasterTest extends TestLogger { final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); - final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>(); - final TestingTaskExecutorGateway testingTaskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setFreeSlotFunction( - (allocationID, throwable) -> { - freedSlotFuture.complete(allocationID); - return CompletableFuture.completedFuture(Acknowledge.get()); - }) - .setDisconnectJobManagerConsumer( - (jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) - .createTestingTaskExecutorGateway(); - final LocalUnresolvedTaskManagerLocation taskManagerLocation = - new LocalUnresolvedTaskManagerLocation(); + final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); + final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction( + (allocationID, throwable) -> { + freedSlotFuture.complete(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setDisconnectJobManagerConsumer( + (jobID, throwable) -> + disconnectTaskExecutorFuture.complete(jobID)) + .createTestingTaskExecutorGateway(); + final LocalUnresolvedTaskManagerLocation taskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); - try { jobMaster.start(); final JobMasterGateway jobMasterGateway = @@ -1568,8 +1527,6 @@ public class JobMasterTest extends TestLogger { // longer slots from it assertThat(freedSlotFuture.get(), equalTo(allocationId)); assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID())); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1589,29 +1546,29 @@ public class JobMasterTest extends TestLogger { new TestingJobMasterPartitionTracker(); partitionTracker.setIsTrackingPartitionsForFunction(ignored -> isTrackingPartitions.get()); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withJobManagerSharedServices(jobManagerSharedServices) .withHeartbeatServices(heartbeatServices) .withPartitionTrackerFactory(ignored -> partitionTracker) - .createJobMaster(); + .createJobMaster()) { - final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); - final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>(); - final TestingTaskExecutorGateway testingTaskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setFreeSlotFunction( - (allocationID, throwable) -> { - freedSlotFuture.complete(allocationID); - return CompletableFuture.completedFuture(Acknowledge.get()); - }) - .setDisconnectJobManagerConsumer( - (jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) - .createTestingTaskExecutorGateway(); + final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); + final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction( + (allocationID, throwable) -> { + freedSlotFuture.complete(allocationID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setDisconnectJobManagerConsumer( + (jobID, throwable) -> + disconnectTaskExecutorFuture.complete(jobID)) + .createTestingTaskExecutorGateway(); - try { jobMaster.start(); final JobMasterGateway jobMasterGateway = @@ -1642,25 +1599,23 @@ public class JobMasterTest extends TestLogger { // complete jobMasterGateway.requestJobStatus(Time.seconds(5)).get(); assertThat(disconnectTaskExecutorFuture.isDone(), is(false)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } /** Tests the updateGlobalAggregate functionality. */ @Test public void testJobMasterAggregatesValuesCorrectly() throws Exception { - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); - final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + jobMaster.start(); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); - try { CompletableFuture<Object> updateAggregateFuture; AggregateFunction<Integer, Integer, Integer> aggregateFunction = @@ -1694,9 +1649,6 @@ public class JobMasterTest extends TestLogger { updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", 23, serializedAggregateFunction); assertThat(updateAggregateFuture.get(), equalTo(33)); - - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1765,9 +1717,9 @@ public class JobMasterTest extends TestLogger { */ @Test public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { - try { jobMaster.start(); final CompletableFuture<RegistrationResponse> registrationResponse = @@ -1780,21 +1732,19 @@ public class JobMasterTest extends TestLogger { testingTimeout); assertThat(registrationResponse.get(), instanceOf(JMTMRegistrationRejection.class)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @Test public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { - final TestingTaskExecutorGateway testingTaskExecutorGateway = - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); - rpcService.registerGateway( - testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + rpcService.registerGateway( + testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); - try { jobMaster.start(); final TaskManagerRegistrationInformation taskManagerRegistrationInformation = @@ -1816,34 +1766,33 @@ public class JobMasterTest extends TestLogger { assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @Test public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception { - final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { - final CompletableFuture<Void> firstTaskExecutorDisconnectedFuture = - new CompletableFuture<>(); - final TestingTaskExecutorGateway firstTaskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setAddress("firstTaskExecutor") - .setDisconnectJobManagerConsumer( - (jobID, throwable) -> - firstTaskExecutorDisconnectedFuture.complete(null)) - .createTestingTaskExecutorGateway(); - final TestingTaskExecutorGateway secondTaskExecutorGateway = - new TestingTaskExecutorGatewayBuilder() - .setAddress("secondTaskExecutor") - .createTestingTaskExecutorGateway(); + final CompletableFuture<Void> firstTaskExecutorDisconnectedFuture = + new CompletableFuture<>(); + final TestingTaskExecutorGateway firstTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setAddress("firstTaskExecutor") + .setDisconnectJobManagerConsumer( + (jobID, throwable) -> + firstTaskExecutorDisconnectedFuture.complete(null)) + .createTestingTaskExecutorGateway(); + final TestingTaskExecutorGateway secondTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setAddress("secondTaskExecutor") + .createTestingTaskExecutorGateway(); - rpcService.registerGateway(firstTaskExecutorGateway.getAddress(), firstTaskExecutorGateway); - rpcService.registerGateway( - secondTaskExecutorGateway.getAddress(), secondTaskExecutorGateway); + rpcService.registerGateway( + firstTaskExecutorGateway.getAddress(), firstTaskExecutorGateway); + rpcService.registerGateway( + secondTaskExecutorGateway.getAddress(), secondTaskExecutorGateway); - try { jobMaster.start(); final LocalUnresolvedTaskManagerLocation taskManagerLocation = @@ -1873,8 +1822,6 @@ public class JobMasterTest extends TestLogger { assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); // the first TaskExecutor should be disconnected firstTaskExecutorDisconnectedFuture.get(); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1886,27 +1833,28 @@ public class JobMasterTest extends TestLogger { .setCloseAsyncSupplier(() -> schedulerTerminationFuture) .build(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withSlotPoolServiceSchedulerFactory( DefaultSlotPoolServiceSchedulerFactory.create( TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(testingSchedulerNG))) - .createJobMaster(); + .createJobMaster()) { - jobMaster.start(); + jobMaster.start(); - final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync(); + final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync(); - try { - jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS); - fail("Expected TimeoutException because the JobMaster should not terminate."); - } catch (TimeoutException expected) { - } + try { + jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS); + fail("Expected TimeoutException because the JobMaster should not terminate."); + } catch (TimeoutException expected) { + } - schedulerTerminationFuture.complete(null); + schedulerTerminationFuture.complete(null); - jobMasterTerminationFuture.get(); + jobMasterTerminationFuture.get(); + } } @Test @@ -1915,12 +1863,11 @@ public class JobMasterTest extends TestLogger { configuration.set( RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(1)); final int numberSlots = 1; - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) - .createJobMaster(); + .createJobMaster()) { - try { jobMaster.start(); final JobMasterGateway jobMasterGateway = @@ -1961,8 +1908,6 @@ public class JobMasterTest extends TestLogger { .createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation()), hasSize(numberSlots)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -1974,15 +1919,14 @@ public class JobMasterTest extends TestLogger { final JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); - final JobMaster jobMaster = + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withResourceId(jmResourceId) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) .withOnCompletionActions(onCompletionActions) - .createJobMaster(); + .createJobMaster()) { - try { jobMaster.start(); final JobMasterGateway jobMasterGateway = @@ -2031,8 +1975,6 @@ public class JobMasterTest extends TestLogger { .getArchivedExecutionGraph(); assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - } finally { - RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } }