zentol commented on a change in pull request #13368: URL: https://github.com/apache/flink/pull/13368#discussion_r488592878
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ########## @@ -749,6 +750,26 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { } catch (TimeoutException expected) {} } + @Test + public void testInitializationTimestampForwardedToExecutionGraph() throws Exception { + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch)); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + // ensure job is running + CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING, Review comment: I'd move all arguments on a separate line. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ########## @@ -749,6 +750,26 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { } catch (TimeoutException expected) {} } + @Test + public void testInitializationTimestampForwardedToExecutionGraph() throws Exception { + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch)); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + // ensure job is running + CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING, + Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L); + + ArchivedExecutionGraph result = dispatcher.requestJob(jobGraph.getJobID(), TIMEOUT).get(); + + // ensure all statuses are set in the ExecutionGraph + assertThat(result.getStatusTimestamp(JobStatus.INITIALIZING), greaterThan(0L)); Review comment: maybe also check that initializing <= created, to prevent the order from being messed up? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ########## @@ -749,6 +750,26 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { } catch (TimeoutException expected) {} } + @Test + public void testInitializationTimestampForwardedToExecutionGraph() throws Exception { + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch)); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + // ensure job is running + CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING, + Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L); Review comment: ```suggestion Deadline.fromNow(Duration.ofSeconds(10), 5L); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org