tillrohrmann commented on a change in pull request #15093:
URL: https://github.com/apache/flink/pull/15093#discussion_r588350992



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -392,46 +418,47 @@ public void testNonBlockingJobSubmission() throws 
Exception {
         assertEquals(jobID, 
multiDetails.getJobs().iterator().next().getJobId());
 
         // submission has succeeded, let the initialization finish.
-        blockingJobGraph.f1.unblock();
+        latch.trigger();
 
         // ensure job is running
         CommonTestUtils.waitUntilCondition(
-                () ->
-                        
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get()
-                                == JobStatus.RUNNING,
+                () -> dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get() 
== JobStatus.RUNNING,
                 Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
                 5L);
     }
 
     @Test(timeout = 5_000L)
     public void testInvalidCallDuringInitialization() throws Exception {
+        final OneShotLatch latch = new OneShotLatch();
         dispatcher =
                 createAndStartDispatcher(
                         heartbeatServices,
                         haServices,
-                        new ExpectedJobIdJobManagerRunnerFactory(
-                                TEST_JOB_ID, createdJobManagerRunnerLatch));
-        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+                        new BlockingJobManagerRunnerFactory(latch::await));
+
         DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = 
getBlockingJobGraphAndVertex();
-        JobID jid = blockingJobGraph.f0.getJobID();
+        final JobGraph emptyJobGraph =
+                
JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(jobId).build();
 
-        dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+        dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get();
 
         assertThat(
-                dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(), 
is(JobStatus.INITIALIZING));
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
+                is(JobStatus.INITIALIZING));
 
         // this call is supposed to fail
         try {
-            dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint", 
false, TIMEOUT).get();
+            dispatcherGateway
+                    .triggerSavepoint(jobId, "file:///tmp/savepoint", false, 
TIMEOUT)
+                    .get();
             fail("Previous statement should have failed");
         } catch (ExecutionException t) {
             assertTrue(t.getCause() instanceof 
UnavailableDispatcherOperationException);
         }
 
         // submission has succeeded, let the initialization finish.
-        blockingJobGraph.f1.unblock();
+        latch.trigger();

Review comment:
       Yeah, `Tuple2` is actually quite ugly. We should try to never use it if 
possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to