tillrohrmann commented on a change in pull request #15093: URL: https://github.com/apache/flink/pull/15093#discussion_r588350992
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ########## @@ -392,46 +418,47 @@ public void testNonBlockingJobSubmission() throws Exception { assertEquals(jobID, multiDetails.getJobs().iterator().next().getJobId()); // submission has succeeded, let the initialization finish. - blockingJobGraph.f1.unblock(); + latch.trigger(); // ensure job is running CommonTestUtils.waitUntilCondition( - () -> - dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() - == JobStatus.RUNNING, + () -> dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get() == JobStatus.RUNNING, Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L); } @Test(timeout = 5_000L) public void testInvalidCallDuringInitialization() throws Exception { + final OneShotLatch latch = new OneShotLatch(); dispatcher = createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - TEST_JOB_ID, createdJobManagerRunnerLatch)); - jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + new BlockingJobManagerRunnerFactory(latch::await)); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex(); - JobID jid = blockingJobGraph.f0.getJobID(); + final JobGraph emptyJobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(jobId).build(); - dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get(); + dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get(); assertThat( - dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(), is(JobStatus.INITIALIZING)); + dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), + is(JobStatus.INITIALIZING)); // this call is supposed to fail try { - dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint", false, TIMEOUT).get(); + dispatcherGateway + .triggerSavepoint(jobId, "file:///tmp/savepoint", false, TIMEOUT) + .get(); fail("Previous statement should have failed"); } catch (ExecutionException t) { assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException); } // submission has succeeded, let the initialization finish. - blockingJobGraph.f1.unblock(); + latch.trigger(); Review comment: Yeah, `Tuple2` is actually quite ugly. We should try to never use it if possible. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org