[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488810#comment-16488810
 ] 

ASF GitHub Bot commented on FLINK-9421:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190500268
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
    @@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
                assertThat(deleteAllFuture.isDone(), is(false));
        }
     
    +   /**
    +    * Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
    +    * job reached a terminal state.
    +    */
    +   @Test
    +   public void testRunningJobsRegistryCleanup() throws Exception {
    +           submitJob();
    +
    +           runningJobsRegistry.setJobRunning(jobId);
    +           assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +           resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +           // wait for the clearing
    +           runningJobsRegistry.getClearedFuture().get();
    +
    +           assertThat(runningJobsRegistry.contains(jobId), is(false));
    +   }
    +
    +   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
    +
    +           private final JobID jobId;
    +
    +           private final CompletableFuture<Void> clearedFuture = new 
CompletableFuture<>();
    +
    +           private JobSchedulingStatus jobSchedulingStatus = 
JobSchedulingStatus.PENDING;
    +
    +           private boolean containsJob = false;
    +
    +           private TestingRunningJobsRegistry(JobID jobId) {
    +                   this.jobId = jobId;
    +           }
    +
    +           public CompletableFuture<Void> getClearedFuture() {
    +                   return clearedFuture;
    +           }
    +
    +           @Override
    +           public void setJobRunning(JobID jobID) throws IOException {
    +                   checkJobId(jobID);
    +                   containsJob = true;
    +                   jobSchedulingStatus = JobSchedulingStatus.RUNNING;
    +           }
    +
    +           private void checkJobId(JobID jobID) {
    +                   Preconditions.checkArgument(jobId.equals(jobID));
    --- End diff --
    
    Not the best variable names here: `jobId` vs. `jobID`


> RunningJobsRegistry entries are not cleaned up after job termination
> --------------------------------------------------------------------
>
>                 Key: FLINK-9421
>                 URL: https://issues.apache.org/jira/browse/FLINK-9421
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to