This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc5d321d70fa1e0600ea56e5940b07877913ebf8 Author: Matthias Pohl <matth...@ververica.com> AuthorDate: Thu Feb 3 17:44:40 2022 +0100 [FLINK-25432][runtime] Integrates ResourceCleaner functionality into Dispatcher --- .../flink/runtime/dispatcher/Dispatcher.java | 176 +++++--------- .../cleanup/DispatcherResourceCleanerFactory.java | 104 ++++++++ .../dispatcher/DispatcherFailoverITCase.java | 79 +++--- .../dispatcher/DispatcherResourceCleanupTest.java | 31 +-- .../flink/runtime/dispatcher/DispatcherTest.java | 6 +- .../DispatcherResourceCleanerFactoryTest.java | 269 +++++++++++++++++++++ 6 files changed, 480 insertions(+), 185 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 88f9237..f15f294 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -33,6 +33,9 @@ import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -149,6 +152,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final DispatcherCachedOperationsHandler dispatcherCachedOperationsHandler; + private final ResourceCleaner localResourceCleaner; + private final ResourceCleaner globalResourceCleaner; + /** Enum to distinguish between initial job submission and re-submission for recovery. */ protected enum ExecutionType { SUBMISSION, @@ -210,6 +216,13 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher dispatcherServices.getOperationCaches(), this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation); + + final ResourceCleanerFactory resourceCleanerFactory = + new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices); + this.localResourceCleaner = + resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor()); + this.globalResourceCleaner = + resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor()); } // ------------------------------------------------------ @@ -427,30 +440,38 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID()); + return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) + .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable)) + .thenCompose(Function.identity()); + } - final CompletableFuture<Acknowledge> persistAndRunFuture = - waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) - .thenApply(ignored -> Acknowledge.get()); - - return persistAndRunFuture.handleAsync( - (acknowledge, throwable) -> { - if (throwable != null) { - cleanUpHighAvailabilityJobData(jobGraph.getJobID()); - ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable); - final Throwable strippedThrowable = - ExceptionUtils.stripCompletionException(throwable); - log.error( - "Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable); - throw new CompletionException( - new JobSubmissionException( - jobGraph.getJobID(), - "Failed to submit job.", - strippedThrowable)); - } else { - return acknowledge; - } - }, - ioExecutor); + private CompletableFuture<Acknowledge> handleTermination( + JobID jobId, @Nullable Throwable terminationThrowable) { + if (terminationThrowable != null) { + return globalResourceCleaner + .cleanupAsync(jobId) + .handleAsync( + (ignored, cleanupThrowable) -> { + if (cleanupThrowable != null) { + log.warn( + "Cleanup didn't succeed after job submission failed for job {}.", + jobId, + cleanupThrowable); + terminationThrowable.addSuppressed(cleanupThrowable); + } + ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError( + terminationThrowable); + final Throwable strippedThrowable = + ExceptionUtils.stripCompletionException( + terminationThrowable); + log.error("Failed to submit job {}.", jobId, strippedThrowable); + throw new CompletionException( + new JobSubmissionException( + jobId, "Failed to submit job.", strippedThrowable)); + }, + getMainThreadExecutor()); + } + return CompletableFuture.completedFuture(Acknowledge.get()); } private void persistAndRunJob(JobGraph jobGraph) throws Exception { @@ -489,9 +510,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher getMainThreadExecutor()); final CompletableFuture<Void> jobTerminationFuture = - cleanupJobStateFuture - .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState)) - .thenCompose(Function.identity()); + cleanupJobStateFuture.thenCompose( + cleanupJobState -> removeJob(jobId, cleanupJobState)); FutureUtils.handleUncaughtException( jobTerminationFuture, @@ -511,14 +531,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } enum CleanupJobState { - LOCAL(false), - GLOBAL(true); - - final boolean cleanupHAData; - - CleanupJobState(boolean cleanupHAData) { - this.cleanupHAData = cleanupHAData; - } + LOCAL, + GLOBAL } private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) { @@ -859,86 +873,15 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) { - final JobManagerRunner job = checkNotNull(jobManagerRunnerRegistry.unregister(jobId)); - return CompletableFuture.supplyAsync( - () -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor) - .thenCompose( - jobGraphRemoved -> job.closeAsync().thenApply(ignored -> jobGraphRemoved)) - .thenAcceptAsync( - jobGraphRemoved -> { - cleanUpRemainingJobData(jobId, jobGraphRemoved); - if (jobGraphRemoved) { - markJobAsClean(jobId); - } - }, - ioExecutor); - } - - /** - * Clean up job graph from {@link org.apache.flink.runtime.jobmanager.JobGraphStore}. - * - * @param jobId Reference to the job that we want to clean. - * @param cleanupHA Flag signalling whether we should remove (we're done with the job) or just - * release the job graph. - * @return True if we have removed the job graph. This means we can clean other HA-related - * services as well. - */ - private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { - if (cleanupHA) { - try { - jobGraphWriter.globalCleanup(jobId); - return true; - } catch (Exception e) { - log.warn( - "Could not properly remove job {} from submitted job graph store.", - jobId, - e); - return false; - } - } - try { - jobGraphWriter.localCleanup(jobId); - } catch (Exception e) { - log.warn("Could not properly release job {} from submitted job graph store.", jobId, e); - } - return false; - } - - private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { - try { - jobManagerMetricGroup.globalCleanup(jobId); - } catch (Exception e) { - log.warn( - "Could not properly clean data for job {} stored in JobManager metric group", - jobId, - e); - } - - if (jobGraphRemoved) { - try { - highAvailabilityServices.globalCleanup(jobId); - } catch (Exception e) { - log.warn( - "Could not properly clean data for job {} stored by ha services", jobId, e); - } - - try { - blobServer.globalCleanup(jobId); - } catch (Exception e) { - log.warn( - "Could not properly global clean data for job {} stored in the BlobServer.", - jobId, - e); - } - } else { - try { - blobServer.localCleanup(jobId); - } catch (IOException e) { - log.warn( - "Could not properly clean local data for job {} stored in the BlobServer.", - jobId, - e); - } + switch (cleanupJobState) { + case LOCAL: + return localResourceCleaner.cleanupAsync(jobId); + case GLOBAL: + return globalResourceCleaner + .cleanupAsync(jobId) + .thenRun(() -> markJobAsClean(jobId)); + default: + throw new IllegalStateException("Invalid cleanup state: " + cleanupJobState); } } @@ -952,11 +895,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } } - private void cleanUpHighAvailabilityJobData(JobID jobId) { - final boolean jobGraphRemoved = cleanUpJobGraph(jobId, true); - cleanUpRemainingJobData(jobId, jobGraphRemoved); - } - /** Terminate all currently running {@link JobManagerRunner}s. */ private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java new file mode 100644 index 0000000..3faa358 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.Executor; + +/** + * {@code DispatcherResourceCleanerFactory} instantiates {@link ResourceCleaner} instances that + * clean cleanable resources from the {@link org.apache.flink.runtime.dispatcher.Dispatcher}. + * + * <p>We need to handle the {@link JobManagerRunnerRegistry} differently due to a dependency between + * closing the {@link org.apache.flink.runtime.jobmaster.JobManagerRunner} and the {@link + * HighAvailabilityServices}. This is fixed in {@code FLINK-24038} using a feature flag to + * enable/disable single leader election for all the {@code JobManager} components. We can remove + * the priority cleanup logic after removing the per-component leader election. + */ +public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory { + + private final Executor cleanupExecutor; + private final JobManagerRunnerRegistry jobManagerRunnerRegistry; + private final JobGraphWriter jobGraphWriter; + private final BlobServer blobServer; + private final HighAvailabilityServices highAvailabilityServices; + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DispatcherResourceCleanerFactory( + JobManagerRunnerRegistry jobManagerRunnerRegistry, + DispatcherServices dispatcherServices) { + this( + dispatcherServices.getIoExecutor(), + jobManagerRunnerRegistry, + dispatcherServices.getJobGraphWriter(), + dispatcherServices.getBlobServer(), + dispatcherServices.getHighAvailabilityServices(), + dispatcherServices.getJobManagerMetricGroup()); + } + + @VisibleForTesting + DispatcherResourceCleanerFactory( + Executor cleanupExecutor, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + JobGraphWriter jobGraphWriter, + BlobServer blobServer, + HighAvailabilityServices highAvailabilityServices, + JobManagerMetricGroup jobManagerMetricGroup) { + this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor); + this.jobManagerRunnerRegistry = Preconditions.checkNotNull(jobManagerRunnerRegistry); + this.jobGraphWriter = Preconditions.checkNotNull(jobGraphWriter); + this.blobServer = Preconditions.checkNotNull(blobServer); + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup); + } + + @Override + public ResourceCleaner createLocalResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor) { + return DefaultResourceCleaner.forLocallyCleanableResources( + mainThreadExecutor, cleanupExecutor) + .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withRegularCleanup(jobGraphWriter) + .withRegularCleanup(blobServer) + .withRegularCleanup(jobManagerMetricGroup) + .build(); + } + + @Override + public ResourceCleaner createGlobalResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor) { + + return DefaultResourceCleaner.forGloballyCleanableResources( + mainThreadExecutor, cleanupExecutor) + .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withRegularCleanup(jobGraphWriter) + .withRegularCleanup(blobServer) + .withRegularCleanup(highAvailabilityServices) + .build(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java index 0cd5d34..fb9ac8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java @@ -20,11 +20,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -32,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -41,6 +40,7 @@ import org.apache.flink.runtime.testutils.TestingJobGraphStore; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.FutureUtils; +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,15 +50,19 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import static org.junit.Assert.assertFalse; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */ @@ -73,10 +77,11 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>( (maxCheckpoints, previous, sharedStateRegistryFactory, ioExecutor) -> { if (previous != null) { - // First job attempt failed before cleaning up the checkpoint - // store. - assertFalse(previous.getShutdownStatus().isPresent()); - assertFalse(previous.getAllCheckpoints().isEmpty()); + // First job cleanup still succeeded for the + // CompletedCheckpointStore because the JobGraph cleanup happens + // after the JobManagerRunner closing + assertTrue(previous.getShutdownStatus().isPresent()); + assertTrue(previous.getAllCheckpoints().isEmpty()); return new EmbeddedCompletedCheckpointStore( maxCheckpoints, previous.getAllCheckpoints(), @@ -110,12 +115,19 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { final JobID jobId = jobGraph.getJobID(); // Construct job graph store. - final Error jobGraphRemovalError = new Error("Unable to remove job graph."); + final Error temporaryError = new Error("Unable to remove job graph."); + final AtomicReference<? extends Error> temporaryErrorRef = + new AtomicReference<>(temporaryError); final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() .setGlobalCleanupFunction( (ignoredJobId, ignoredExecutor) -> { - throw jobGraphRemovalError; + final Error error = temporaryErrorRef.getAndSet(null); + if (error != null) { + throw error; + } + + return FutureUtils.completedVoidFuture(); }) .build(); jobGraphStore.start(null); @@ -133,8 +145,7 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { throwable -> { final Optional<Error> maybeError = ExceptionUtils.findThrowable(throwable, Error.class); - if (maybeError.isPresent() - && jobGraphRemovalError.equals(maybeError.get())) { + if (maybeError.isPresent() && temporaryError.equals(maybeError.get())) { jobGraphRemovalErrorReceived.countDown(); } else { testingFatalErrorHandlerResource @@ -170,28 +181,30 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { // This will clear internal state of election service, so a new contender can register. leaderElectionService.stop(); + assertThat( + "The JobGraph is still stored in the JobGraphStore.", + haServices.getJobGraphStore().getJobIds(), + CoreMatchers.is(Collections.singleton(jobId))); + assertThat( + "The JobResultStore has this job marked as dirty.", + haServices.getJobResultStore().getDirtyResults().stream() + .map(JobResult::getJobId) + .collect(Collectors.toSet()), + CoreMatchers.is(Collections.singleton(jobId))); + // Run a second dispatcher, that restores our finished job. final Dispatcher secondDispatcher = createRecoveredDispatcher(null); toTerminate.add(secondDispatcher); - final DispatcherGateway secondDispatcherGateway = - secondDispatcher.getSelfGateway(DispatcherGateway.class); + + // new Dispatcher becomes new leader leaderElectionService.isLeader(UUID.randomUUID()); - // Now make sure that restored job started from checkpoint. - final JobMasterGateway secondJobMasterGateway = - connectToLeadingJobMaster(leaderElectionService).get(); - try (final JobMasterTester tester = - new JobMasterTester(rpcService, jobId, secondJobMasterGateway)) { - final CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = - tester.deployVertices(2); - awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING); - final Optional<JobManagerTaskRestore> maybeRestore = - descriptorsFuture.get().stream() - .map(TaskDeploymentDescriptor::getTaskRestore) - .filter(Objects::nonNull) - .findAny(); - assertTrue("Job has recovered from checkpoint.", maybeRestore.isPresent()); - } + assertThrows( + "No JobMaster will be instantiated because of the JobResult is already persisted in the JobResultStore", + TimeoutException.class, + () -> + connectToLeadingJobMaster(leaderElectionService) + .get(100, TimeUnit.MILLISECONDS)); } private JobGraph createJobGraph() { @@ -222,11 +235,11 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { @Nullable FatalErrorHandler fatalErrorHandler) throws Exception { final List<JobGraph> jobGraphs = new ArrayList<>(); for (JobID jobId : haServices.getJobGraphStore().getJobIds()) { - jobGraphs.add(haServices.getJobGraphStore().recoverJobGraph(jobId)); + // there shouldn't be an overlap between dirty JobResults and recovered JobGraphs + if (!haServices.getJobResultStore().hasJobResultEntry(jobId)) { + jobGraphs.add(haServices.getJobGraphStore().recoverJobGraph(jobId)); + } } - // we need to reinstantiate the JobResultStore here to simulate a not-persisting - // JobResultStore - haServices.setJobResultStore(new EmbeddedJobResultStore()); final TestingDispatcher dispatcher = new TestingDispatcherBuilder() .setJobManagerRunnerFactory( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index dec9318..a4063b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -55,7 +55,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.runtime.testutils.TestingJobGraphStore; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.ExceptionUtils; @@ -627,34 +626,6 @@ public class DispatcherResourceCleanupTest extends TestLogger { } @Test - public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception { - jobGraphWriter = - TestingJobGraphStore.newBuilder() - .setGlobalCleanupConsumer( - ignored -> { - throw new Exception("Failed to Remove future"); - }) - .withAutomaticStart() - .build(); - - final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); - - ArchivedExecutionGraph executionGraph = - new ArchivedExecutionGraphBuilder() - .setJobID(jobId) - .setState(JobStatus.CANCELED) - .build(); - - final TestingJobManagerRunner testingJobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); - - assertLocalCleanupTriggered(jobId); - assertThat(deleteAllHABlobsFuture.isDone(), is(false)); - } - - @Test public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception { final TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); @@ -681,7 +652,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { private void assertGlobalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException, TimeoutException { - assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + assertThat(localCleanupFuture.isDone(), is(false)); assertThat(globalCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 2a14207..6f4898a 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -93,7 +93,6 @@ import org.apache.flink.util.function.ThrowingRunnable; import org.assertj.core.api.Assertions; import org.hamcrest.Matchers; -import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -133,6 +132,7 @@ import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -791,7 +791,7 @@ public class DispatcherTest extends AbstractDispatcherTest { // make sure we've cleaned up in correct order (including HA) assertThat( new ArrayList<>(cleanUpEvents), - equalTo(Arrays.asList(CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_HA_SERVICES))); + containsInAnyOrder(CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_HA_SERVICES)); } // don't fail this time @@ -1184,7 +1184,7 @@ public class DispatcherTest extends AbstractDispatcherTest { assertThat( "All cleanup tasks should have been finished before marking the job as clean.", cleanUpEvents, - IsIterableContainingInAnyOrder.containsInAnyOrder( + containsInAnyOrder( CLEANUP_HA_SERVICES, CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_JOB_MANAGER_RUNNER))) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java new file mode 100644 index 0000000..4aeec6a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.TestingBlobStoreBuilder; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry; +import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerRegistry; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code DispatcherResourceCleanerFactoryTest} verifies that the resources are properly cleaned up + * for both, the {@link GloballyCleanableResource} and {@link LocallyCleanableResource} of the + * {@link org.apache.flink.runtime.dispatcher.Dispatcher}. + */ +public class DispatcherResourceCleanerFactoryTest { + + private static final JobID JOB_ID = new JobID(); + + private CleanableBlobServer blobServer; + + private CompletableFuture<JobID> jobManagerRunnerRegistryLocalCleanupFuture; + private CompletableFuture<Void> jobManagerRunnerRegistryLocalCleanupResultFuture; + private CompletableFuture<JobID> jobManagerRunnerRegistryGlobalCleanupFuture; + private CompletableFuture<Void> jobManagerRunnerRegistryGlobalCleanupResultFuture; + + private CompletableFuture<JobID> jobGraphWriterLocalCleanupFuture; + private CompletableFuture<JobID> jobGraphWriterGlobalCleanupFuture; + + private CompletableFuture<JobID> highAvailabilityServicesGlobalCleanupFuture; + private JobManagerMetricGroup jobManagerMetricGroup; + + private DispatcherResourceCleanerFactory testInstance; + + @BeforeEach + public void setup() throws Exception { + blobServer = new CleanableBlobServer(); + + MetricRegistry metricRegistry = TestingMetricRegistry.builder().build(); + jobManagerMetricGroup = + JobManagerMetricGroup.createJobManagerMetricGroup( + metricRegistry, "ignored hostname"); + jobManagerMetricGroup.addJob(JOB_ID, "ignored job name"); + + testInstance = + new DispatcherResourceCleanerFactory( + Executors.directExecutor(), + createJobManagerRunnerRegistry(), + createJobGraphWriter(), + blobServer, + createHighAvailabilityServices(), + jobManagerMetricGroup); + } + + private JobManagerRunnerRegistry createJobManagerRunnerRegistry() { + jobManagerRunnerRegistryLocalCleanupFuture = new CompletableFuture<>(); + jobManagerRunnerRegistryLocalCleanupResultFuture = new CompletableFuture<>(); + + jobManagerRunnerRegistryGlobalCleanupFuture = new CompletableFuture<>(); + jobManagerRunnerRegistryGlobalCleanupResultFuture = new CompletableFuture<>(); + + return TestingJobManagerRunnerRegistry.builder() + .withLocalCleanupAsyncFunction( + (jobId, executor) -> { + jobManagerRunnerRegistryLocalCleanupFuture.complete(jobId); + return jobManagerRunnerRegistryLocalCleanupResultFuture; + }) + .withGlobalCleanupAsyncFunction( + (jobId, executor) -> { + jobManagerRunnerRegistryGlobalCleanupFuture.complete(jobId); + return jobManagerRunnerRegistryGlobalCleanupResultFuture; + }) + .build(); + } + + private JobGraphWriter createJobGraphWriter() throws Exception { + jobGraphWriterLocalCleanupFuture = new CompletableFuture<>(); + jobGraphWriterGlobalCleanupFuture = new CompletableFuture<>(); + final TestingJobGraphStore jobGraphStore = + TestingJobGraphStore.newBuilder() + .setGlobalCleanupFunction( + (jobId, executor) -> { + jobGraphWriterGlobalCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) + .setLocalCleanupFunction( + (jobId, ignoredExecutor) -> { + jobGraphWriterLocalCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) + .build(); + jobGraphStore.start(null); + + return jobGraphStore; + } + + private HighAvailabilityServices createHighAvailabilityServices() { + highAvailabilityServicesGlobalCleanupFuture = new CompletableFuture<>(); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setGlobalCleanupFuture(highAvailabilityServicesGlobalCleanupFuture); + return haServices; + } + + @Test + public void testLocalResourceCleaning() { + assertGlobalCleanupNotTriggered(); + assertLocalCleanupNotTriggered(); + + final CompletableFuture<Void> cleanupResultFuture = + testInstance + .createLocalResourceCleaner( + ComponentMainThreadExecutorServiceAdapter.forMainThread()) + .cleanupAsync(JOB_ID); + + assertGlobalCleanupNotTriggered(); + assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry(); + + assertThat(cleanupResultFuture).isNotCompleted(); + + jobManagerRunnerRegistryLocalCleanupResultFuture.complete(null); + + assertGlobalCleanupNotTriggered(); + assertLocalCleanupTriggered(); + + assertThat(cleanupResultFuture).isCompleted(); + } + + @Test + public void testGlobalResourceCleaning() + throws ExecutionException, InterruptedException, TimeoutException { + assertGlobalCleanupNotTriggered(); + assertLocalCleanupNotTriggered(); + + final CompletableFuture<Void> cleanupResultFuture = + testInstance + .createGlobalResourceCleaner( + ComponentMainThreadExecutorServiceAdapter.forMainThread()) + .cleanupAsync(JOB_ID); + + assertLocalCleanupNotTriggered(); + assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry(); + + jobManagerRunnerRegistryGlobalCleanupResultFuture.complete(null); + + assertGlobalCleanupTriggered(); + assertLocalCleanupNotTriggered(); + + assertThat(cleanupResultFuture).isCompleted(); + } + + private void assertLocalCleanupNotTriggered() { + assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone(); + assertThat(jobGraphWriterLocalCleanupFuture).isNotDone(); + assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); + assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1); + } + + private void assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry() { + assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isCompleted(); + + // the JobManagerRunnerRegistry needs to be cleaned up first + assertThat(jobGraphWriterLocalCleanupFuture).isNotDone(); + assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); + assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1); + } + + private void assertGlobalCleanupNotTriggered() { + assertThat(jobGraphWriterGlobalCleanupFuture).isNotDone(); + assertThat(blobServer.getGlobalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServicesGlobalCleanupFuture).isNotDone(); + } + + private void assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry() { + assertThat(jobManagerRunnerRegistryGlobalCleanupFuture).isCompleted(); + + // the JobManagerRunnerRegistry needs to be cleaned up first + assertThat(jobGraphWriterGlobalCleanupFuture).isNotDone(); + assertThat(blobServer.getGlobalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServicesGlobalCleanupFuture).isNotDone(); + } + + private void assertLocalCleanupTriggered() { + assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isCompleted(); + assertThat(jobGraphWriterLocalCleanupFuture).isCompleted(); + assertThat(blobServer.getLocalCleanupFuture()).isCompleted(); + assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(0); + } + + private void assertGlobalCleanupTriggered() { + assertThat(jobManagerRunnerRegistryGlobalCleanupFuture).isCompleted(); + assertThat(jobGraphWriterGlobalCleanupFuture).isCompleted(); + assertThat(blobServer.getGlobalCleanupFuture()).isCompleted(); + assertThat(highAvailabilityServicesGlobalCleanupFuture).isCompleted(); + } + + private static class CleanableBlobServer extends BlobServer { + + private final CompletableFuture<JobID> localCleanupFuture = new CompletableFuture<>(); + private final CompletableFuture<JobID> globalCleanupFuture = new CompletableFuture<>(); + + public CleanableBlobServer() throws IOException { + super( + new Configuration(), + new File("non-existent-file"), + new TestingBlobStoreBuilder().createTestingBlobStore()); + } + + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) { + localCleanupFuture.complete(jobId); + + return FutureUtils.completedVoidFuture(); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor ignoredExecutor) { + globalCleanupFuture.complete(jobId); + + return FutureUtils.completedVoidFuture(); + } + + public CompletableFuture<JobID> getLocalCleanupFuture() { + return localCleanupFuture; + } + + public CompletableFuture<JobID> getGlobalCleanupFuture() { + return globalCleanupFuture; + } + } +}