This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10bfd2971dd960ac799ba02156683fc7a740383d
Author: Till Rohrmann <[email protected]>
AuthorDate: Sat Nov 7 17:22:52 2020 +0100

    [FLINK-20033] Ensure that stopping a JobMaster will suspend the running job
    
    This commit adds a test which ensures that stopping a JobMaster will 
suspend the running job.
    
    This closes #13980.
---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 52 ++++++++++++++++++++++
 1 file changed, 52 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index a7af955..9b35f18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1953,6 +1953,58 @@ public class JobMasterTest extends TestLogger {
                );
        }
 
+       /**
+        * Tests that the job gets suspended when the JobMaster stops. See 
FLINK-20033.
+        */
+       @Test
+       public void testJobSuspensionWhenJobMasterStops() throws Exception {
+               final JobMasterBuilder.TestingOnCompletionActions 
onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       JobGraphTestUtils.createSingleVertexJobGraph(),
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices,
+                       onCompletionActions);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId);
+
+               try {
+                       // wait for the start of the JobMaster
+                       startFuture.get();
+
+                       final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+                               
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+                                       
tddFuture.complete(taskDeploymentDescriptor);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                               .createTestingTaskExecutorGateway();
+                       final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       registerSlotsAtJobMaster(1, jobMasterGateway, 
testingTaskExecutorGateway, taskManagerLocation);
+
+                       // wait for deployment
+                       tddFuture.get();
+
+                       // trigger termination of JobMaster and also the job
+                       jobMaster.close();
+
+                       try {
+                               final ArchivedExecutionGraph 
archivedExecutionGraph = onCompletionActions
+                                       
.getJobReachedGloballyTerminalStateFuture()
+                                       .get(50, TimeUnit.MILLISECONDS);
+                               fail(String.format("Job must not reach the 
globally terminal state %s when stopping the JobMaster.", 
archivedExecutionGraph.getState()));
+                       } catch (TimeoutException expected) {
+                               // expected
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        private void runJobFailureWhenTaskExecutorTerminatesTest(
                        HeartbeatServices heartbeatServices,
                        BiConsumer<LocalTaskManagerLocation, JobMasterGateway> 
jobReachedRunningState,

Reply via email to