This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc5d321d70fa1e0600ea56e5940b07877913ebf8
Author: Matthias Pohl <matth...@ververica.com>
AuthorDate: Thu Feb 3 17:44:40 2022 +0100

    [FLINK-25432][runtime] Integrates ResourceCleaner functionality into 
Dispatcher
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 176 +++++---------
 .../cleanup/DispatcherResourceCleanerFactory.java  | 104 ++++++++
 .../dispatcher/DispatcherFailoverITCase.java       |  79 +++---
 .../dispatcher/DispatcherResourceCleanupTest.java  |  31 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |   6 +-
 .../DispatcherResourceCleanerFactoryTest.java      | 269 +++++++++++++++++++++
 6 files changed, 480 insertions(+), 185 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 88f9237..f15f294 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -33,6 +33,9 @@ import 
org.apache.flink.runtime.client.DuplicateJobSubmissionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner;
+import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
 import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -149,6 +152,9 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final DispatcherCachedOperationsHandler 
dispatcherCachedOperationsHandler;
 
+    private final ResourceCleaner localResourceCleaner;
+    private final ResourceCleaner globalResourceCleaner;
+
     /** Enum to distinguish between initial job submission and re-submission 
for recovery. */
     protected enum ExecutionType {
         SUBMISSION,
@@ -210,6 +216,13 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                         dispatcherServices.getOperationCaches(),
                         this::triggerSavepointAndGetLocation,
                         this::stopWithSavepointAndGetLocation);
+
+        final ResourceCleanerFactory resourceCleanerFactory =
+                new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, 
dispatcherServices);
+        this.localResourceCleaner =
+                
resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
+        this.globalResourceCleaner =
+                
resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor());
     }
 
     // ------------------------------------------------------
@@ -427,30 +440,38 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
     private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
         log.info("Submitting job '{}' ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+        return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
+                .handle((ignored, throwable) -> 
handleTermination(jobGraph.getJobID(), throwable))
+                .thenCompose(Function.identity());
+    }
 
-        final CompletableFuture<Acknowledge> persistAndRunFuture =
-                waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
-                        .thenApply(ignored -> Acknowledge.get());
-
-        return persistAndRunFuture.handleAsync(
-                (acknowledge, throwable) -> {
-                    if (throwable != null) {
-                        cleanUpHighAvailabilityJobData(jobGraph.getJobID());
-                        
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
-                        final Throwable strippedThrowable =
-                                
ExceptionUtils.stripCompletionException(throwable);
-                        log.error(
-                                "Failed to submit job {}.", 
jobGraph.getJobID(), strippedThrowable);
-                        throw new CompletionException(
-                                new JobSubmissionException(
-                                        jobGraph.getJobID(),
-                                        "Failed to submit job.",
-                                        strippedThrowable));
-                    } else {
-                        return acknowledge;
-                    }
-                },
-                ioExecutor);
+    private CompletableFuture<Acknowledge> handleTermination(
+            JobID jobId, @Nullable Throwable terminationThrowable) {
+        if (terminationThrowable != null) {
+            return globalResourceCleaner
+                    .cleanupAsync(jobId)
+                    .handleAsync(
+                            (ignored, cleanupThrowable) -> {
+                                if (cleanupThrowable != null) {
+                                    log.warn(
+                                            "Cleanup didn't succeed after job 
submission failed for job {}.",
+                                            jobId,
+                                            cleanupThrowable);
+                                    
terminationThrowable.addSuppressed(cleanupThrowable);
+                                }
+                                
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(
+                                        terminationThrowable);
+                                final Throwable strippedThrowable =
+                                        
ExceptionUtils.stripCompletionException(
+                                                terminationThrowable);
+                                log.error("Failed to submit job {}.", jobId, 
strippedThrowable);
+                                throw new CompletionException(
+                                        new JobSubmissionException(
+                                                jobId, "Failed to submit 
job.", strippedThrowable));
+                            },
+                            getMainThreadExecutor());
+        }
+        return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
     private void persistAndRunJob(JobGraph jobGraph) throws Exception {
@@ -489,9 +510,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                 getMainThreadExecutor());
 
         final CompletableFuture<Void> jobTerminationFuture =
-                cleanupJobStateFuture
-                        .thenApply(cleanupJobState -> removeJob(jobId, 
cleanupJobState))
-                        .thenCompose(Function.identity());
+                cleanupJobStateFuture.thenCompose(
+                        cleanupJobState -> removeJob(jobId, cleanupJobState));
 
         FutureUtils.handleUncaughtException(
                 jobTerminationFuture,
@@ -511,14 +531,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     enum CleanupJobState {
-        LOCAL(false),
-        GLOBAL(true);
-
-        final boolean cleanupHAData;
-
-        CleanupJobState(boolean cleanupHAData) {
-            this.cleanupHAData = cleanupHAData;
-        }
+        LOCAL,
+        GLOBAL
     }
 
     private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable 
throwable) {
@@ -859,86 +873,15 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
-        final JobManagerRunner job = 
checkNotNull(jobManagerRunnerRegistry.unregister(jobId));
-        return CompletableFuture.supplyAsync(
-                        () -> cleanUpJobGraph(jobId, 
cleanupJobState.cleanupHAData), ioExecutor)
-                .thenCompose(
-                        jobGraphRemoved -> job.closeAsync().thenApply(ignored 
-> jobGraphRemoved))
-                .thenAcceptAsync(
-                        jobGraphRemoved -> {
-                            cleanUpRemainingJobData(jobId, jobGraphRemoved);
-                            if (jobGraphRemoved) {
-                                markJobAsClean(jobId);
-                            }
-                        },
-                        ioExecutor);
-    }
-
-    /**
-     * Clean up job graph from {@link 
org.apache.flink.runtime.jobmanager.JobGraphStore}.
-     *
-     * @param jobId Reference to the job that we want to clean.
-     * @param cleanupHA Flag signalling whether we should remove (we're done 
with the job) or just
-     *     release the job graph.
-     * @return True if we have removed the job graph. This means we can clean 
other HA-related
-     *     services as well.
-     */
-    private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
-        if (cleanupHA) {
-            try {
-                jobGraphWriter.globalCleanup(jobId);
-                return true;
-            } catch (Exception e) {
-                log.warn(
-                        "Could not properly remove job {} from submitted job 
graph store.",
-                        jobId,
-                        e);
-                return false;
-            }
-        }
-        try {
-            jobGraphWriter.localCleanup(jobId);
-        } catch (Exception e) {
-            log.warn("Could not properly release job {} from submitted job 
graph store.", jobId, e);
-        }
-        return false;
-    }
-
-    private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) 
{
-        try {
-            jobManagerMetricGroup.globalCleanup(jobId);
-        } catch (Exception e) {
-            log.warn(
-                    "Could not properly clean data for job {} stored in 
JobManager metric group",
-                    jobId,
-                    e);
-        }
-
-        if (jobGraphRemoved) {
-            try {
-                highAvailabilityServices.globalCleanup(jobId);
-            } catch (Exception e) {
-                log.warn(
-                        "Could not properly clean data for job {} stored by ha 
services", jobId, e);
-            }
-
-            try {
-                blobServer.globalCleanup(jobId);
-            } catch (Exception e) {
-                log.warn(
-                        "Could not properly global clean data for job {} 
stored in the BlobServer.",
-                        jobId,
-                        e);
-            }
-        } else {
-            try {
-                blobServer.localCleanup(jobId);
-            } catch (IOException e) {
-                log.warn(
-                        "Could not properly clean local data for job {} stored 
in the BlobServer.",
-                        jobId,
-                        e);
-            }
+        switch (cleanupJobState) {
+            case LOCAL:
+                return localResourceCleaner.cleanupAsync(jobId);
+            case GLOBAL:
+                return globalResourceCleaner
+                        .cleanupAsync(jobId)
+                        .thenRun(() -> markJobAsClean(jobId));
+            default:
+                throw new IllegalStateException("Invalid cleanup state: " + 
cleanupJobState);
         }
     }
 
@@ -952,11 +895,6 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         }
     }
 
-    private void cleanUpHighAvailabilityJobData(JobID jobId) {
-        final boolean jobGraphRemoved = cleanUpJobGraph(jobId, true);
-        cleanUpRemainingJobData(jobId, jobGraphRemoved);
-    }
-
     /** Terminate all currently running {@link JobManagerRunner}s. */
     private void terminateRunningJobs() {
         log.info("Stopping all currently running jobs of dispatcher {}.", 
getAddress());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
new file mode 100644
index 0000000..3faa358
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@code DispatcherResourceCleanerFactory} instantiates {@link 
ResourceCleaner} instances that
+ * clean cleanable resources from the {@link 
org.apache.flink.runtime.dispatcher.Dispatcher}.
+ *
+ * <p>We need to handle the {@link JobManagerRunnerRegistry} differently due 
to a dependency between
+ * closing the {@link org.apache.flink.runtime.jobmaster.JobManagerRunner} and 
the {@link
+ * HighAvailabilityServices}. This is fixed in {@code FLINK-24038} using a 
feature flag to
+ * enable/disable single leader election for all the {@code JobManager} 
components. We can remove
+ * the priority cleanup logic after removing the per-component leader election.
+ */
+public class DispatcherResourceCleanerFactory implements 
ResourceCleanerFactory {
+
+    private final Executor cleanupExecutor;
+    private final JobManagerRunnerRegistry jobManagerRunnerRegistry;
+    private final JobGraphWriter jobGraphWriter;
+    private final BlobServer blobServer;
+    private final HighAvailabilityServices highAvailabilityServices;
+    private final JobManagerMetricGroup jobManagerMetricGroup;
+
+    public DispatcherResourceCleanerFactory(
+            JobManagerRunnerRegistry jobManagerRunnerRegistry,
+            DispatcherServices dispatcherServices) {
+        this(
+                dispatcherServices.getIoExecutor(),
+                jobManagerRunnerRegistry,
+                dispatcherServices.getJobGraphWriter(),
+                dispatcherServices.getBlobServer(),
+                dispatcherServices.getHighAvailabilityServices(),
+                dispatcherServices.getJobManagerMetricGroup());
+    }
+
+    @VisibleForTesting
+    DispatcherResourceCleanerFactory(
+            Executor cleanupExecutor,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry,
+            JobGraphWriter jobGraphWriter,
+            BlobServer blobServer,
+            HighAvailabilityServices highAvailabilityServices,
+            JobManagerMetricGroup jobManagerMetricGroup) {
+        this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor);
+        this.jobManagerRunnerRegistry = 
Preconditions.checkNotNull(jobManagerRunnerRegistry);
+        this.jobGraphWriter = Preconditions.checkNotNull(jobGraphWriter);
+        this.blobServer = Preconditions.checkNotNull(blobServer);
+        this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+        this.jobManagerMetricGroup = 
Preconditions.checkNotNull(jobManagerMetricGroup);
+    }
+
+    @Override
+    public ResourceCleaner createLocalResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return DefaultResourceCleaner.forLocallyCleanableResources(
+                        mainThreadExecutor, cleanupExecutor)
+                .withPrioritizedCleanup(jobManagerRunnerRegistry)
+                .withRegularCleanup(jobGraphWriter)
+                .withRegularCleanup(blobServer)
+                .withRegularCleanup(jobManagerMetricGroup)
+                .build();
+    }
+
+    @Override
+    public ResourceCleaner createGlobalResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor) {
+
+        return DefaultResourceCleaner.forGloballyCleanableResources(
+                        mainThreadExecutor, cleanupExecutor)
+                .withPrioritizedCleanup(jobManagerRunnerRegistry)
+                .withRegularCleanup(jobGraphWriter)
+                .withRegularCleanup(blobServer)
+                .withRegularCleanup(highAvailabilityServices)
+                .build();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
index 0cd5d34..fb9ac8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -32,6 +30,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -41,6 +40,7 @@ import 
org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,15 +50,19 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 /** An integration test for various fail-over scenarios of the {@link 
Dispatcher} component. */
@@ -73,10 +77,11 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
                 new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
                         (maxCheckpoints, previous, sharedStateRegistryFactory, 
ioExecutor) -> {
                             if (previous != null) {
-                                // First job attempt failed before cleaning up 
the checkpoint
-                                // store.
-                                
assertFalse(previous.getShutdownStatus().isPresent());
-                                
assertFalse(previous.getAllCheckpoints().isEmpty());
+                                // First job cleanup still succeeded for the
+                                // CompletedCheckpointStore because the 
JobGraph cleanup happens
+                                // after the JobManagerRunner closing
+                                
assertTrue(previous.getShutdownStatus().isPresent());
+                                
assertTrue(previous.getAllCheckpoints().isEmpty());
                                 return new EmbeddedCompletedCheckpointStore(
                                         maxCheckpoints,
                                         previous.getAllCheckpoints(),
@@ -110,12 +115,19 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
         final JobID jobId = jobGraph.getJobID();
 
         // Construct job graph store.
-        final Error jobGraphRemovalError = new Error("Unable to remove job 
graph.");
+        final Error temporaryError = new Error("Unable to remove job graph.");
+        final AtomicReference<? extends Error> temporaryErrorRef =
+                new AtomicReference<>(temporaryError);
         final TestingJobGraphStore jobGraphStore =
                 TestingJobGraphStore.newBuilder()
                         .setGlobalCleanupFunction(
                                 (ignoredJobId, ignoredExecutor) -> {
-                                    throw jobGraphRemovalError;
+                                    final Error error = 
temporaryErrorRef.getAndSet(null);
+                                    if (error != null) {
+                                        throw error;
+                                    }
+
+                                    return FutureUtils.completedVoidFuture();
                                 })
                         .build();
         jobGraphStore.start(null);
@@ -133,8 +145,7 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
                         throwable -> {
                             final Optional<Error> maybeError =
                                     ExceptionUtils.findThrowable(throwable, 
Error.class);
-                            if (maybeError.isPresent()
-                                    && 
jobGraphRemovalError.equals(maybeError.get())) {
+                            if (maybeError.isPresent() && 
temporaryError.equals(maybeError.get())) {
                                 jobGraphRemovalErrorReceived.countDown();
                             } else {
                                 testingFatalErrorHandlerResource
@@ -170,28 +181,30 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
         // This will clear internal state of election service, so a new 
contender can register.
         leaderElectionService.stop();
 
+        assertThat(
+                "The JobGraph is still stored in the JobGraphStore.",
+                haServices.getJobGraphStore().getJobIds(),
+                CoreMatchers.is(Collections.singleton(jobId)));
+        assertThat(
+                "The JobResultStore has this job marked as dirty.",
+                haServices.getJobResultStore().getDirtyResults().stream()
+                        .map(JobResult::getJobId)
+                        .collect(Collectors.toSet()),
+                CoreMatchers.is(Collections.singleton(jobId)));
+
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
         toTerminate.add(secondDispatcher);
-        final DispatcherGateway secondDispatcherGateway =
-                secondDispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // new Dispatcher becomes new leader
         leaderElectionService.isLeader(UUID.randomUUID());
 
-        // Now make sure that restored job started from checkpoint.
-        final JobMasterGateway secondJobMasterGateway =
-                connectToLeadingJobMaster(leaderElectionService).get();
-        try (final JobMasterTester tester =
-                new JobMasterTester(rpcService, jobId, 
secondJobMasterGateway)) {
-            final CompletableFuture<List<TaskDeploymentDescriptor>> 
descriptorsFuture =
-                    tester.deployVertices(2);
-            awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
-            final Optional<JobManagerTaskRestore> maybeRestore =
-                    descriptorsFuture.get().stream()
-                            .map(TaskDeploymentDescriptor::getTaskRestore)
-                            .filter(Objects::nonNull)
-                            .findAny();
-            assertTrue("Job has recovered from checkpoint.", 
maybeRestore.isPresent());
-        }
+        assertThrows(
+                "No JobMaster will be instantiated because of the JobResult is 
already persisted in the JobResultStore",
+                TimeoutException.class,
+                () ->
+                        connectToLeadingJobMaster(leaderElectionService)
+                                .get(100, TimeUnit.MILLISECONDS));
     }
 
     private JobGraph createJobGraph() {
@@ -222,11 +235,11 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
             @Nullable FatalErrorHandler fatalErrorHandler) throws Exception {
         final List<JobGraph> jobGraphs = new ArrayList<>();
         for (JobID jobId : haServices.getJobGraphStore().getJobIds()) {
-            
jobGraphs.add(haServices.getJobGraphStore().recoverJobGraph(jobId));
+            // there shouldn't be an overlap between dirty JobResults and 
recovered JobGraphs
+            if (!haServices.getJobResultStore().hasJobResultEntry(jobId)) {
+                
jobGraphs.add(haServices.getJobGraphStore().recoverJobGraph(jobId));
+            }
         }
-        // we need to reinstantiate the JobResultStore here to simulate a 
not-persisting
-        // JobResultStore
-        haServices.setJobResultStore(new EmbeddedJobResultStore());
         final TestingDispatcher dispatcher =
                 new TestingDispatcherBuilder()
                         .setJobManagerRunnerFactory(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index dec9318..a4063b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.ExceptionUtils;
@@ -627,34 +626,6 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
     }
 
     @Test
-    public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws 
Exception {
-        jobGraphWriter =
-                TestingJobGraphStore.newBuilder()
-                        .setGlobalCleanupConsumer(
-                                ignored -> {
-                                    throw new Exception("Failed to Remove 
future");
-                                })
-                        .withAutomaticStart()
-                        .build();
-
-        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
-                startDispatcherAndSubmitJob();
-
-        ArchivedExecutionGraph executionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.CANCELED)
-                        .build();
-
-        final TestingJobManagerRunner testingJobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(new 
ExecutionGraphInfo(executionGraph));
-
-        assertLocalCleanupTriggered(jobId);
-        assertThat(deleteAllHABlobsFuture.isDone(), is(false));
-    }
-
-    @Test
     public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws 
Exception {
         final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
                 startDispatcherAndSubmitJob();
@@ -681,7 +652,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
 
     private void assertGlobalCleanupTriggered(JobID jobId)
             throws ExecutionException, InterruptedException, TimeoutException {
-        assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), 
equalTo(jobId));
+        assertThat(localCleanupFuture.isDone(), is(false));
         assertThat(globalCleanupFuture.get(100, TimeUnit.MILLISECONDS), 
equalTo(jobId));
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 2a14207..6f4898a 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -93,7 +93,6 @@ import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.assertj.core.api.Assertions;
 import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -133,6 +132,7 @@ import static 
org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -791,7 +791,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
             // make sure we've cleaned up in correct order (including HA)
             assertThat(
                     new ArrayList<>(cleanUpEvents),
-                    equalTo(Arrays.asList(CLEANUP_JOB_GRAPH_REMOVE, 
CLEANUP_HA_SERVICES)));
+                    containsInAnyOrder(CLEANUP_JOB_GRAPH_REMOVE, 
CLEANUP_HA_SERVICES));
         }
 
         // don't fail this time
@@ -1184,7 +1184,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                                         assertThat(
                                                 "All cleanup tasks should have 
been finished before marking the job as clean.",
                                                 cleanUpEvents,
-                                                
IsIterableContainingInAnyOrder.containsInAnyOrder(
+                                                containsInAnyOrder(
                                                         CLEANUP_HA_SERVICES,
                                                         
CLEANUP_JOB_GRAPH_REMOVE,
                                                         
CLEANUP_JOB_MANAGER_RUNNER)))
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
new file mode 100644
index 0000000..4aeec6a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
+import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerRegistry;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code DispatcherResourceCleanerFactoryTest} verifies that the resources 
are properly cleaned up
+ * for both, the {@link GloballyCleanableResource} and {@link 
LocallyCleanableResource} of the
+ * {@link org.apache.flink.runtime.dispatcher.Dispatcher}.
+ */
+public class DispatcherResourceCleanerFactoryTest {
+
+    private static final JobID JOB_ID = new JobID();
+
+    private CleanableBlobServer blobServer;
+
+    private CompletableFuture<JobID> 
jobManagerRunnerRegistryLocalCleanupFuture;
+    private CompletableFuture<Void> 
jobManagerRunnerRegistryLocalCleanupResultFuture;
+    private CompletableFuture<JobID> 
jobManagerRunnerRegistryGlobalCleanupFuture;
+    private CompletableFuture<Void> 
jobManagerRunnerRegistryGlobalCleanupResultFuture;
+
+    private CompletableFuture<JobID> jobGraphWriterLocalCleanupFuture;
+    private CompletableFuture<JobID> jobGraphWriterGlobalCleanupFuture;
+
+    private CompletableFuture<JobID> 
highAvailabilityServicesGlobalCleanupFuture;
+    private JobManagerMetricGroup jobManagerMetricGroup;
+
+    private DispatcherResourceCleanerFactory testInstance;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        blobServer = new CleanableBlobServer();
+
+        MetricRegistry metricRegistry = 
TestingMetricRegistry.builder().build();
+        jobManagerMetricGroup =
+                JobManagerMetricGroup.createJobManagerMetricGroup(
+                        metricRegistry, "ignored hostname");
+        jobManagerMetricGroup.addJob(JOB_ID, "ignored job name");
+
+        testInstance =
+                new DispatcherResourceCleanerFactory(
+                        Executors.directExecutor(),
+                        createJobManagerRunnerRegistry(),
+                        createJobGraphWriter(),
+                        blobServer,
+                        createHighAvailabilityServices(),
+                        jobManagerMetricGroup);
+    }
+
+    private JobManagerRunnerRegistry createJobManagerRunnerRegistry() {
+        jobManagerRunnerRegistryLocalCleanupFuture = new CompletableFuture<>();
+        jobManagerRunnerRegistryLocalCleanupResultFuture = new 
CompletableFuture<>();
+
+        jobManagerRunnerRegistryGlobalCleanupFuture = new 
CompletableFuture<>();
+        jobManagerRunnerRegistryGlobalCleanupResultFuture = new 
CompletableFuture<>();
+
+        return TestingJobManagerRunnerRegistry.builder()
+                .withLocalCleanupAsyncFunction(
+                        (jobId, executor) -> {
+                            
jobManagerRunnerRegistryLocalCleanupFuture.complete(jobId);
+                            return 
jobManagerRunnerRegistryLocalCleanupResultFuture;
+                        })
+                .withGlobalCleanupAsyncFunction(
+                        (jobId, executor) -> {
+                            
jobManagerRunnerRegistryGlobalCleanupFuture.complete(jobId);
+                            return 
jobManagerRunnerRegistryGlobalCleanupResultFuture;
+                        })
+                .build();
+    }
+
+    private JobGraphWriter createJobGraphWriter() throws Exception {
+        jobGraphWriterLocalCleanupFuture = new CompletableFuture<>();
+        jobGraphWriterGlobalCleanupFuture = new CompletableFuture<>();
+        final TestingJobGraphStore jobGraphStore =
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (jobId, executor) -> {
+                                    
jobGraphWriterGlobalCleanupFuture.complete(jobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .setLocalCleanupFunction(
+                                (jobId, ignoredExecutor) -> {
+                                    
jobGraphWriterLocalCleanupFuture.complete(jobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+        jobGraphStore.start(null);
+
+        return jobGraphStore;
+    }
+
+    private HighAvailabilityServices createHighAvailabilityServices() {
+        highAvailabilityServicesGlobalCleanupFuture = new 
CompletableFuture<>();
+        final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+        
haServices.setGlobalCleanupFuture(highAvailabilityServicesGlobalCleanupFuture);
+        return haServices;
+    }
+
+    @Test
+    public void testLocalResourceCleaning() {
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupNotTriggered();
+
+        final CompletableFuture<Void> cleanupResultFuture =
+                testInstance
+                        .createLocalResourceCleaner(
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .cleanupAsync(JOB_ID);
+
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry();
+
+        assertThat(cleanupResultFuture).isNotCompleted();
+
+        jobManagerRunnerRegistryLocalCleanupResultFuture.complete(null);
+
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupTriggered();
+
+        assertThat(cleanupResultFuture).isCompleted();
+    }
+
+    @Test
+    public void testGlobalResourceCleaning()
+            throws ExecutionException, InterruptedException, TimeoutException {
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupNotTriggered();
+
+        final CompletableFuture<Void> cleanupResultFuture =
+                testInstance
+                        .createGlobalResourceCleaner(
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .cleanupAsync(JOB_ID);
+
+        assertLocalCleanupNotTriggered();
+        assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry();
+
+        jobManagerRunnerRegistryGlobalCleanupResultFuture.complete(null);
+
+        assertGlobalCleanupTriggered();
+        assertLocalCleanupNotTriggered();
+
+        assertThat(cleanupResultFuture).isCompleted();
+    }
+
+    private void assertLocalCleanupNotTriggered() {
+        assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone();
+        assertThat(jobGraphWriterLocalCleanupFuture).isNotDone();
+        assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
+        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);
+    }
+
+    private void 
assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry() {
+        assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isCompleted();
+
+        // the JobManagerRunnerRegistry needs to be cleaned up first
+        assertThat(jobGraphWriterLocalCleanupFuture).isNotDone();
+        assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
+        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);
+    }
+
+    private void assertGlobalCleanupNotTriggered() {
+        assertThat(jobGraphWriterGlobalCleanupFuture).isNotDone();
+        assertThat(blobServer.getGlobalCleanupFuture()).isNotDone();
+        assertThat(highAvailabilityServicesGlobalCleanupFuture).isNotDone();
+    }
+
+    private void 
assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry() {
+        assertThat(jobManagerRunnerRegistryGlobalCleanupFuture).isCompleted();
+
+        // the JobManagerRunnerRegistry needs to be cleaned up first
+        assertThat(jobGraphWriterGlobalCleanupFuture).isNotDone();
+        assertThat(blobServer.getGlobalCleanupFuture()).isNotDone();
+        assertThat(highAvailabilityServicesGlobalCleanupFuture).isNotDone();
+    }
+
+    private void assertLocalCleanupTriggered() {
+        assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isCompleted();
+        assertThat(jobGraphWriterLocalCleanupFuture).isCompleted();
+        assertThat(blobServer.getLocalCleanupFuture()).isCompleted();
+        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(0);
+    }
+
+    private void assertGlobalCleanupTriggered() {
+        assertThat(jobManagerRunnerRegistryGlobalCleanupFuture).isCompleted();
+        assertThat(jobGraphWriterGlobalCleanupFuture).isCompleted();
+        assertThat(blobServer.getGlobalCleanupFuture()).isCompleted();
+        assertThat(highAvailabilityServicesGlobalCleanupFuture).isCompleted();
+    }
+
+    private static class CleanableBlobServer extends BlobServer {
+
+        private final CompletableFuture<JobID> localCleanupFuture = new 
CompletableFuture<>();
+        private final CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        public CleanableBlobServer() throws IOException {
+            super(
+                    new Configuration(),
+                    new File("non-existent-file"),
+                    new TestingBlobStoreBuilder().createTestingBlobStore());
+        }
+
+        @Override
+        public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
ignoredExecutor) {
+            localCleanupFuture.complete(jobId);
+
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public CompletableFuture<Void> globalCleanupAsync(JobID jobId, 
Executor ignoredExecutor) {
+            globalCleanupFuture.complete(jobId);
+
+            return FutureUtils.completedVoidFuture();
+        }
+
+        public CompletableFuture<JobID> getLocalCleanupFuture() {
+            return localCleanupFuture;
+        }
+
+        public CompletableFuture<JobID> getGlobalCleanupFuture() {
+            return globalCleanupFuture;
+        }
+    }
+}

Reply via email to