[ https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488810#comment-16488810 ]
ASF GitHub Bot commented on FLINK-9421: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6068#discussion_r190500268 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java --- @@ -271,6 +279,80 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { assertThat(deleteAllFuture.isDone(), is(false)); } + /** + * Tests that the {@link RunningJobsRegistry} entries are cleared after the + * job reached a terminal state. + */ + @Test + public void testRunningJobsRegistryCleanup() throws Exception { + submitJob(); + + runningJobsRegistry.setJobRunning(jobId); + assertThat(runningJobsRegistry.contains(jobId), is(true)); + + resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build()); + + // wait for the clearing + runningJobsRegistry.getClearedFuture().get(); + + assertThat(runningJobsRegistry.contains(jobId), is(false)); + } + + private static final class TestingRunningJobsRegistry implements RunningJobsRegistry { + + private final JobID jobId; + + private final CompletableFuture<Void> clearedFuture = new CompletableFuture<>(); + + private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING; + + private boolean containsJob = false; + + private TestingRunningJobsRegistry(JobID jobId) { + this.jobId = jobId; + } + + public CompletableFuture<Void> getClearedFuture() { + return clearedFuture; + } + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkJobId(jobID); + containsJob = true; + jobSchedulingStatus = JobSchedulingStatus.RUNNING; + } + + private void checkJobId(JobID jobID) { + Preconditions.checkArgument(jobId.equals(jobID)); --- End diff -- Not the best variable names here: `jobId` vs. `jobID` > RunningJobsRegistry entries are not cleaned up after job termination > -------------------------------------------------------------------- > > Key: FLINK-9421 > URL: https://issues.apache.org/jira/browse/FLINK-9421 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.5.0, 1.6.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Critical > > Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} > after the job has finished. The consequence is that a ZNode with the JobID > and a state num per job remains in ZooKeeper. > We should clean up these ZNodes to avoid a resource leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)