Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6068#discussion_r190500268
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
    @@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
                assertThat(deleteAllFuture.isDone(), is(false));
        }
     
    +   /**
    +    * Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
    +    * job reached a terminal state.
    +    */
    +   @Test
    +   public void testRunningJobsRegistryCleanup() throws Exception {
    +           submitJob();
    +
    +           runningJobsRegistry.setJobRunning(jobId);
    +           assertThat(runningJobsRegistry.contains(jobId), is(true));
    +
    +           resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
    +
    +           // wait for the clearing
    +           runningJobsRegistry.getClearedFuture().get();
    +
    +           assertThat(runningJobsRegistry.contains(jobId), is(false));
    +   }
    +
    +   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
    +
    +           private final JobID jobId;
    +
    +           private final CompletableFuture<Void> clearedFuture = new 
CompletableFuture<>();
    +
    +           private JobSchedulingStatus jobSchedulingStatus = 
JobSchedulingStatus.PENDING;
    +
    +           private boolean containsJob = false;
    +
    +           private TestingRunningJobsRegistry(JobID jobId) {
    +                   this.jobId = jobId;
    +           }
    +
    +           public CompletableFuture<Void> getClearedFuture() {
    +                   return clearedFuture;
    +           }
    +
    +           @Override
    +           public void setJobRunning(JobID jobID) throws IOException {
    +                   checkJobId(jobID);
    +                   containsJob = true;
    +                   jobSchedulingStatus = JobSchedulingStatus.RUNNING;
    +           }
    +
    +           private void checkJobId(JobID jobID) {
    +                   Preconditions.checkArgument(jobId.equals(jobID));
    --- End diff --
    
    Not the best variable names here: `jobId` vs. `jobID`


---

Reply via email to