Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6068#discussion_r190571191 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java --- @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { assertThat(deleteAllFuture.isDone(), is(false)); } + /** + * Tests that the {@link RunningJobsRegistry} entries are cleared after the + * job reached a terminal state. + */ + @Test + public void testRunningJobsRegistryCleanup() throws Exception { + submitJob(); + + runningJobsRegistry.setJobRunning(jobId); + assertThat(runningJobsRegistry.contains(jobId), is(true)); + + resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build()); + + // wait for the clearing + runningJobsRegistry.getClearedFuture().get(); + + assertThat(runningJobsRegistry.contains(jobId), is(false)); + } + + private static final class TestingRunningJobsRegistry implements RunningJobsRegistry { + + private final JobID jobId; + + private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>(); + + private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING; + + private boolean containsJob = false; + + private TestingRunningJobsRegistry(JobID jobId) { + this.jobId = jobId; + } + + public CompletableFuture<Void> getClearedFuture() { + return clearedFuture; + } + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkJobId(jobID); + containsJob = true; + jobSchedulingStatus = JobSchedulingStatus.RUNNING; + } + + private void checkJobId(JobID jobID) { + Preconditions.checkArgument(jobId.equals(jobID)); --- End diff -- True, will change it.
---