Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155242826 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -86,122 +125,143 @@ public static void teardown() { } } - /** - * Tests that we can submit a job to the Dispatcher which then spawns a - * new JobManagerRunner. - */ - @Test - public void testJobSubmission() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); - TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); - haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore()); + final JobVertex testVertex = new JobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + jobGraph.setAllowQueuedScheduling(true); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); - JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class); + fatalErrorHandler = new TestingFatalErrorHandler(); + final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); + submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore()); - final JobGraph jobGraph = mock(JobGraph.class); - final JobID jobId = new JobID(); - when(jobGraph.getJobID()).thenReturn(jobId); + dispatcherLeaderElectionService = new TestingLeaderElectionService(); + jobMasterLeaderElectionService = new TestingLeaderElectionService(); - final TestingDispatcher dispatcher = new TestingDispatcher( + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); + haServices.setSubmittedJobGraphStore(submittedJobGraphStore); + haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + runningJobsRegistry = haServices.getRunningJobsRegistry(); + + final Configuration blobServerConfig = new Configuration(); + blobServerConfig.setString( + BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), new Configuration(), haServices, mock(ResourceManagerGateway.class), - mock(BlobServer.class), + new BlobServer(blobServerConfig, new VoidBlobStore()), heartbeatServices, - mock(MetricRegistryImpl.class), + new NoOpMetricRegistry(), fatalErrorHandler, - jobManagerRunner, - jobId); + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** + * Tests that we can submit a job to the Dispatcher which then spawns a + * new JobManagerRunner. + */ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + assertTrue( + "jobManagerRunner was not started", + dispatcherLeaderElectionService.isStarted()); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - try { - dispatcher.start(); + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + + verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds(); + } + + /** + * Test callbacks from + * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}. + */ + @Test + public void testSubmittedJobGraphListener() throws Exception { + dispatcher.recoverJobsEnabled.set(false); --- End diff -- It is not so predictable when the main logic of `recoverJobs()` is scheduled because it does not run in the main thread: ``` /** * Recovers all jobs persisted via the submitted job graph store. */ @VisibleForTesting void recoverJobs() { log.info("Recovering all persisted jobs."); getRpcService().execute( () -> { final Collection<JobID> jobIds; ... ``` When the test adds something to the job graph store, it could potentially be submitted by `recoverJobs`, which is something I need to avoid.
---