This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10bfd2971dd960ac799ba02156683fc7a740383d Author: Till Rohrmann <[email protected]> AuthorDate: Sat Nov 7 17:22:52 2020 +0100 [FLINK-20033] Ensure that stopping a JobMaster will suspend the running job This commit adds a test which ensures that stopping a JobMaster will suspend the running job. This closes #13980. --- .../flink/runtime/jobmaster/JobMasterTest.java | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index a7af955..9b35f18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1953,6 +1953,58 @@ public class JobMasterTest extends TestLogger { ); } + /** + * Tests that the job gets suspended when the JobMaster stops. See FLINK-20033. + */ + @Test + public void testJobSuspensionWhenJobMasterStops() throws Exception { + final JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); + + final JobMaster jobMaster = createJobMaster( + configuration, + JobGraphTestUtils.createSingleVertexJobGraph(), + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatServices, + onCompletionActions); + + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId); + + try { + // wait for the start of the JobMaster + startFuture.get(); + + final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + tddFuture.complete(taskDeploymentDescriptor); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + registerSlotsAtJobMaster(1, jobMasterGateway, testingTaskExecutorGateway, taskManagerLocation); + + // wait for deployment + tddFuture.get(); + + // trigger termination of JobMaster and also the job + jobMaster.close(); + + try { + final ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions + .getJobReachedGloballyTerminalStateFuture() + .get(50, TimeUnit.MILLISECONDS); + fail(String.format("Job must not reach the globally terminal state %s when stopping the JobMaster.", archivedExecutionGraph.getState())); + } catch (TimeoutException expected) { + // expected + } + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + private void runJobFailureWhenTaskExecutorTerminatesTest( HeartbeatServices heartbeatServices, BiConsumer<LocalTaskManagerLocation, JobMasterGateway> jobReachedRunningState,
