Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5107#discussion_r155242826
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
    @@ -86,122 +125,143 @@ public static void teardown() {
                }
        }
     
    -   /**
    -    * Tests that we can submit a job to the Dispatcher which then spawns a
    -    * new JobManagerRunner.
    -    */
    -   @Test
    -   public void testJobSubmission() throws Exception {
    -           TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
    +   @Before
    +   public void setUp() throws Exception {
    +           MockitoAnnotations.initMocks(this);
     
    -           TestingLeaderElectionService dispatcherLeaderElectionService = 
new TestingLeaderElectionService();
    -           TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    -           
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
    -           haServices.setSubmittedJobGraphStore(new 
StandaloneSubmittedJobGraphStore());
    +           final JobVertex testVertex = new JobVertex("testVertex");
    +           testVertex.setInvokableClass(NoOpInvokable.class);
    +           jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
    +           jobGraph.setAllowQueuedScheduling(true);
     
    -           HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 10000L);
    -           JobManagerRunner jobManagerRunner = 
mock(JobManagerRunner.class);
    +           fatalErrorHandler = new TestingFatalErrorHandler();
    +           final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 10000L);
    +           submittedJobGraphStore = spy(new 
InMemorySubmittedJobGraphStore());
     
    -           final JobGraph jobGraph = mock(JobGraph.class);
    -           final JobID jobId = new JobID();
    -           when(jobGraph.getJobID()).thenReturn(jobId);
    +           dispatcherLeaderElectionService = new 
TestingLeaderElectionService();
    +           jobMasterLeaderElectionService = new 
TestingLeaderElectionService();
     
    -           final TestingDispatcher dispatcher = new TestingDispatcher(
    +           final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    +           
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
    +           haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
    +           haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
    +           haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
    +           haServices.setResourceManagerLeaderRetriever(new 
TestingLeaderRetrievalService());
    +           runningJobsRegistry = haServices.getRunningJobsRegistry();
    +
    +           final Configuration blobServerConfig = new Configuration();
    +           blobServerConfig.setString(
    +                   BlobServerOptions.STORAGE_DIRECTORY,
    +                   temporaryFolder.newFolder().getAbsolutePath());
    +
    +           dispatcher = new TestingDispatcher(
                        rpcService,
                        Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
                        new Configuration(),
                        haServices,
                        mock(ResourceManagerGateway.class),
    -                   mock(BlobServer.class),
    +                   new BlobServer(blobServerConfig, new VoidBlobStore()),
                        heartbeatServices,
    -                   mock(MetricRegistryImpl.class),
    +                   new NoOpMetricRegistry(),
                        fatalErrorHandler,
    -                   jobManagerRunner,
    -                   jobId);
    +                   TEST_JOB_ID);
     
    -           try {
    -                   dispatcher.start();
    +           dispatcher.start();
    +   }
     
    -                   CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +   @After
    +   public void tearDown() throws Exception {
    +           try {
    +                   fatalErrorHandler.rethrowError();
    +           } finally {
    +                   RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
    +           }
    +   }
     
    -                   // wait for the leader to be elected
    -                   leaderFuture.get();
    +   /**
    +    * Tests that we can submit a job to the Dispatcher which then spawns a
    +    * new JobManagerRunner.
    +    */
    +   @Test
    +   public void testJobSubmission() throws Exception {
    +           CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
     
    -                   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
    +           // wait for the leader to be elected
    +           leaderFuture.get();
     
    -                   CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
    +           DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
     
    -                   acknowledgeFuture.get();
    +           CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
     
    -                   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
    +           acknowledgeFuture.get();
     
    -                   // check that no error has occurred
    -                   fatalErrorHandler.rethrowError();
    -           } finally {
    -                   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -           }
    +           assertTrue(
    +                   "jobManagerRunner was not started",
    +                   dispatcherLeaderElectionService.isStarted());
        }
     
        /**
         * Tests that the dispatcher takes part in the leader election.
         */
        @Test
        public void testLeaderElection() throws Exception {
    -           TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
    -           TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    -
                UUID expectedLeaderSessionId = UUID.randomUUID();
    -           CompletableFuture<UUID> leaderSessionIdFuture = new 
CompletableFuture<>();
    -           SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
    -           TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
    -                   @Override
    -                   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
    -                           super.confirmLeaderSessionID(leaderSessionId);
    -                           leaderSessionIdFuture.complete(leaderSessionId);
    -                   }
    -           };
     
    -           
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
    -           
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
    -           HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
    -           final JobID jobId = new JobID();
    +           
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
     
    -           final TestingDispatcher dispatcher = new TestingDispatcher(
    -                   rpcService,
    -                   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
    -                   new Configuration(),
    -                   haServices,
    -                   mock(ResourceManagerGateway.class),
    -                   mock(BlobServer.class),
    -                   heartbeatServices,
    -                   mock(MetricRegistryImpl.class),
    -                   fatalErrorHandler,
    -                   mock(JobManagerRunner.class),
    -                   jobId);
    +           
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
     
    -           try {
    -                   dispatcher.start();
    +           UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
    +                   .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +           assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +
    +           verify(submittedJobGraphStore, 
Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds();
    +   }
    +
    +   /**
    +    * Test callbacks from
    +    * {@link 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
    +    */
    +   @Test
    +   public void testSubmittedJobGraphListener() throws Exception {
    +           dispatcher.recoverJobsEnabled.set(false);
    --- End diff --
    
    It is not so predictable when the main logic of `recoverJobs()` is 
scheduled because it does not run in the main thread:
    ```
    
        /**
         * Recovers all jobs persisted via the submitted job graph store.
         */
        @VisibleForTesting
        void recoverJobs() {
                log.info("Recovering all persisted jobs.");
    
                getRpcService().execute(
                        () -> {
                                final Collection<JobID> jobIds;
    
        ...
    ```
    
    When the test adds something to the job graph store, it could potentially 
be submitted by `recoverJobs`, which is something I need to avoid.


---

Reply via email to