This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22598624741dbc785ef2bd38d9aef65b2c395932 Author: Yi Zhang <[email protected]> AuthorDate: Wed Mar 18 15:49:44 2026 +0800 [FLINK-39309][runtime] Check terminated applications for duplication --- .../clusterframework/ApplicationStatus.java | 25 ++ .../flink/runtime/dispatcher/Dispatcher.java | 275 +++++++++++++-------- .../AbstractThreadsafeApplicationResultStore.java | 10 + .../highavailability/ApplicationResultStore.java | 17 ++ .../EmbeddedApplicationResultStore.java | 10 + .../FileSystemApplicationResultStore.java | 17 ++ .../clusterframework/ApplicationStatusTest.java | 29 +++ .../dispatcher/DispatcherApplicationTest.java | 117 +++++++++ .../ApplicationResultStoreContractTest.java | 50 ++++ ...leSystemApplicationResultStoreContractTest.java | 23 ++ .../testutils/TestingApplicationResultStore.java | 29 ++- 11 files changed, 501 insertions(+), 101 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java index f5037471274..94707eb6c7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobStatus; import org.apache.flink.shaded.guava33.com.google.common.collect.BiMap; @@ -89,4 +90,28 @@ public enum ApplicationStatus { return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(this); } + + /** + * Derives the ApplicationStatus that corresponds to the given ApplicationState. If the + * ApplicationState is not a terminal state, this method returns {@link #UNKNOWN}. + * + * <p>Note: {@code ApplicationState} covers the entire lifecycle of an application, representing + * various stages from created to finish. {@code ApplicationStatus}, on the other hand, + * describes the final state of the cluster when shutdown. + * + * @param applicationState the application state + * @return the corresponding status + */ + public static ApplicationStatus fromApplicationState(ApplicationState applicationState) { + switch (applicationState) { + case FAILED: + return FAILED; + case CANCELED: + return CANCELED; + case FINISHED: + return SUCCEEDED; + default: + return UNKNOWN; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index d5394704e32..6d53fa0e221 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.WebOptions; @@ -483,52 +484,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> this.getRpcService().getScheduledExecutor(), this::onFatalError); + startApplicationsCleanup(); + if (dispatcherBootstrap instanceof ApplicationBootstrap) { // Application Mode - checkState(suspendedApplications.isEmpty()); - checkState(recoveredDirtyApplicationResults.size() <= 1); - - AbstractApplication application = - ((ApplicationBootstrap) dispatcherBootstrap).getApplication(); - if (!recoveredDirtyApplicationResults.isEmpty()) { - // the application is already terminated - ApplicationResult applicationResult = - recoveredDirtyApplicationResults.iterator().next(); - checkState( - application - .getApplicationId() - .equals(applicationResult.getApplicationId())); - - startApplicationCleanup(); - } else { - // defer starting recovered jobs, as they might be skipped based on user logic - internalSubmitApplication(application).get(); - } + maybeSubmitApplicationInApplicationMode(); } else { // Session Mode - startApplicationCleanup(); - - // start suspended applications - for (AbstractApplication suspendedApplication : suspendedApplications.values()) { - // defer starting recovered jobs, as they might be skipped based on user logic - internalSubmitApplication(suspendedApplication).get(); - } - - // start suspended jobs that do not belong to any application (previously submitted in a - // SingleJobApplication) by wrapping them into a SingleJobApplication - Iterator<Map.Entry<JobID, ExecutionPlan>> jobIterator = - suspendedJobs.entrySet().iterator(); - while (jobIterator.hasNext()) { - Map.Entry<JobID, ExecutionPlan> entry = jobIterator.next(); - ExecutionPlan recoveredJob = entry.getValue(); - ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null); - - if (!suspendedApplications.containsKey(applicationId)) { - runRecoveredJob(recoveredJob, true); - jobIterator.remove(); - suspendedJobIdsByApplicationId.remove(applicationId); - } - } + recoverApplicationsAndJobsInSessionMode(); } checkState(recoveredDirtyJobResultsByApplicationId.isEmpty()); @@ -673,49 +636,147 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> } } - private void startApplicationCleanup() { + private void startApplicationsCleanup() { for (ApplicationResult applicationResult : recoveredDirtyApplicationResults) { - ApplicationID applicationId = applicationResult.getApplicationId(); - ApplicationState applicationState = applicationResult.getApplicationState(); - - Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>(); - Collection<JobResult> dirtyJobResults = - recoveredDirtyJobResultsByApplicationId.remove(applicationId); - if (dirtyJobResults != null) { - for (JobResult jobResult : dirtyJobResults) { - JobID jobId = jobResult.getJobId(); - ExecutionGraphInfo executionGraphInfo = - new ExecutionGraphInfo( - ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobResult, -1)); - jobs.put(jobId, executionGraphInfo); - - runJobWithCleanupRunner(jobResult, false); - } + try { + startApplicationCleanup(applicationResult); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not start cleanup for application %s.", + applicationResult.getApplicationId()), + throwable)); } + } + } - long[] stateTimestamps = new long[ApplicationState.values().length]; - stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime(); - stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime(); + private void startApplicationCleanup(ApplicationResult applicationResult) { + ApplicationID applicationId = applicationResult.getApplicationId(); + ApplicationState applicationState = applicationResult.getApplicationState(); - ArchivedApplication sparseArchivedApplication = - new ArchivedApplication( - applicationId, - applicationResult.getApplicationName(), - applicationState, - stateTimestamps, - jobs, - Collections.emptyList()); + Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>(); + Collection<JobResult> dirtyJobResults = + recoveredDirtyJobResultsByApplicationId.remove(applicationId); + if (dirtyJobResults != null) { + for (JobResult jobResult : dirtyJobResults) { + JobID jobId = jobResult.getJobId(); + ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( + jobResult, -1)); + jobs.put(jobId, executionGraphInfo); + + runJobWithCleanupRunner(jobResult, false); + } + } - writeToArchivedApplicationStore(sparseArchivedApplication); + long[] stateTimestamps = new long[ApplicationState.values().length]; + stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime(); + stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime(); + + ArchivedApplication sparseArchivedApplication = + new ArchivedApplication( + applicationId, + applicationResult.getApplicationName(), + applicationState, + stateTimestamps, + jobs, + Collections.emptyList()); + + writeToArchivedApplicationStore(sparseArchivedApplication); + + // the dirty result already exists + // create a completed future to make sure the jobs can be marked clean + applicationCreateDirtyResultFutures.put(applicationId, FutureUtils.completedVoidFuture()); + applicationTerminationFutures.put(applicationId, new CompletableFuture<>()); - // the dirty result already exists - // create a completed future to make sure the jobs can be marked clean - applicationCreateDirtyResultFutures.put( - applicationId, FutureUtils.completedVoidFuture()); - applicationTerminationFutures.put(applicationId, new CompletableFuture<>()); + removeApplication(applicationId, jobs.keySet()); + } - removeApplication(applicationId, jobs.keySet()); + private void recoverApplicationsAndJobsInSessionMode() { + for (AbstractApplication suspendedApplication : suspendedApplications.values()) { + // defer starting recovered jobs, as they might be skipped based on user logic + try { + internalSubmitApplication(suspendedApplication).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not start recovered application %s.", + suspendedApplication.getApplicationId()), + throwable)); + return; + } + } + + // start suspended jobs that do not belong to any application (previously submitted in a + // SingleJobApplication) by wrapping them into a SingleJobApplication + Iterator<Map.Entry<JobID, ExecutionPlan>> jobIterator = suspendedJobs.entrySet().iterator(); + while (jobIterator.hasNext()) { + Map.Entry<JobID, ExecutionPlan> entry = jobIterator.next(); + ExecutionPlan recoveredJob = entry.getValue(); + ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null); + if (!suspendedApplications.containsKey(applicationId)) { + runRecoveredJob(recoveredJob, true); + jobIterator.remove(); + suspendedJobIdsByApplicationId.remove(applicationId); + } + } + } + + private void maybeSubmitApplicationInApplicationMode() { + checkState(suspendedApplications.isEmpty()); + checkState(recoveredDirtyApplicationResults.size() <= 1); + + AbstractApplication application = + ((ApplicationBootstrap) dispatcherBootstrap).getApplication(); + ApplicationID applicationId = application.getApplicationId(); + boolean shutDownOnApplicationFinish = + configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH); + if (!recoveredDirtyApplicationResults.isEmpty()) { + // the application is already terminated but needs to be cleaned up + ApplicationResult applicationResult = + recoveredDirtyApplicationResults.iterator().next(); + checkState(applicationId.equals(applicationResult.getApplicationId())); + + if (shutDownOnApplicationFinish) { + shutDownCluster( + ApplicationStatus.fromApplicationState( + applicationResult.getApplicationState())); + } + } else { + // check whether the application was submitted and is already cleaned up + ApplicationResult applicationResult = null; + try { + applicationResult = + applicationResultStore.getCleanApplicationResultAsync(applicationId).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not get clean application result for application %s.", + applicationId), + throwable)); + return; + } + + if (applicationResult == null) { + try { + internalSubmitApplication(application).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format("Could not start application %s.", applicationId), + throwable)); + } + } else { + if (shutDownOnApplicationFinish) { + shutDownCluster( + ApplicationStatus.fromApplicationState( + applicationResult.getApplicationState())); + } + } } } @@ -873,29 +934,45 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> final ApplicationID applicationId = application.getApplicationId(); log.info( "Received application submission '{}' ({}).", application.getName(), applicationId); + return applicationResultStore + .hasApplicationResultEntryAsync(applicationId) + .thenComposeAsync( + isTerminated -> { + if (isTerminated) { + log.warn( + "Ignoring application submission '{}' ({}) because the application already " + + "reached a terminal state.", + application.getName(), + applicationId); + return FutureUtils.completedExceptionally( + new DuplicateApplicationSubmissionException(applicationId)); + } else if (applications.containsKey(applicationId) + || archivedApplicationStore.get(applicationId).isPresent()) { + log.warn("Application with id {} already exists.", applicationId); + return FutureUtils.completedExceptionally( + new DuplicateApplicationSubmissionException(applicationId)); + } - if (applications.containsKey(applicationId)) { - log.warn("Application with id {} already exists.", applicationId); - throw new CompletionException( - new DuplicateApplicationSubmissionException(applicationId)); - } - - Optional<ApplicationStoreEntry> optionalApplicationStoreEntry = - application.getApplicationStoreEntry(); - if (optionalApplicationStoreEntry.isPresent()) { - try { - applicationWriter.putApplication(optionalApplicationStoreEntry.get()); - } catch (Exception e) { - String msg = - String.format( - "Could not persist application %s to the ApplicationStore.", - applicationId); - log.warn(msg); - throw new CompletionException(new RuntimeException(msg, e)); - } - } + Optional<ApplicationStoreEntry> optionalApplicationStoreEntry = + application.getApplicationStoreEntry(); + if (optionalApplicationStoreEntry.isPresent()) { + try { + applicationWriter.putApplication( + optionalApplicationStoreEntry.get()); + } catch (Exception e) { + String msg = + String.format( + "Could not persist application %s to the ApplicationStore.", + applicationId); + log.warn(msg, e); + return FutureUtils.completedExceptionally( + new RuntimeException(msg, e)); + } + } - return internalSubmitApplication(application); + return internalSubmitApplication(application); + }, + getMainThreadExecutor()); } /** This method must be called from the main thread. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java index 5fbdacff807..4f539dd7c16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java @@ -128,6 +128,16 @@ public abstract class AbstractThreadsafeApplicationResultStore implements Applic @GuardedBy("readWriteLock") protected abstract Set<ApplicationResult> getDirtyResultsInternal() throws IOException; + @Override + public CompletableFuture<ApplicationResult> getCleanApplicationResultAsync( + ApplicationID applicationId) { + return withReadLockAsync(() -> getCleanApplicationResultInternal(applicationId)); + } + + @GuardedBy("readWriteLock") + protected abstract ApplicationResult getCleanApplicationResultInternal( + ApplicationID applicationId) throws IOException; + private CompletableFuture<Void> withWriteLockAsync(ThrowingRunnable<IOException> runnable) { return FutureUtils.runAsync( () -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java index 5606e8e8169..8e48b84d0e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java @@ -109,4 +109,21 @@ public interface ApplicationResultStore { * @throws IOException if collecting the set of dirty results failed for IO reasons. */ Set<ApplicationResult> getDirtyResults() throws IOException; + + /** + * Asynchronously gets the persisted {@link ApplicationResult} instance that is marked as {@code + * clean} for the given {@code ApplicationID}. + * + * <p>This method is used to determine whether the application has already completed and been + * cleaned up, thereby avoiding duplicate execution. Note that it only works when {@link + * ApplicationResultStoreOptions#DELETE_ON_COMMIT} is set to {@code false}; otherwise clean + * entries are deleted upon commit and cannot be retrieved. + * + * @param applicationId Ident of the application we wish to get. + * @return a {@link CompletableFuture} that completes with the {@link ApplicationResult} if a + * clean entry exists for the given {@code applicationId}; otherwise a successfully + * completed future with {@code null}. + */ + CompletableFuture<ApplicationResult> getCleanApplicationResultAsync( + ApplicationID applicationId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java index af987d6dba3..3ac50ebb352 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java @@ -75,6 +75,16 @@ public class EmbeddedApplicationResultStore extends AbstractThreadsafeApplicatio .collect(Collectors.toSet()); } + @Override + protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) { + final ApplicationResultEntry entry = cleanResults.get(applicationId); + if (entry == null) { + return null; + } + + return entry.getApplicationResult(); + } + /** Clears all stored results. */ public void clear() { dirtyResults.clear(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java index fd938fede9b..b4832110fad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java @@ -230,6 +230,23 @@ public class FileSystemApplicationResultStore extends AbstractThreadsafeApplicat return dirtyResults; } + @Override + protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) + throws IOException { + if (deleteOnCommit) { + return null; + } + + Path cleanPath = constructCleanPath(applicationId); + if (!fileSystem.exists(cleanPath)) { + return null; + } + + JsonApplicationResultEntry jre = + mapper.readValue(fileSystem.open(cleanPath), JsonApplicationResultEntry.class); + return jre.getApplicationResult(); + } + /** * Wrapper class around {@link ApplicationResultEntry} to allow for serialization of a schema * version, so that future schema changes can be handled in a backwards compatible manner. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java index 507c0a4fe26..7e4620b67a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobStatus; import org.junit.jupiter.api.Test; @@ -115,6 +116,34 @@ class ApplicationStatusTest { assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN); } + @Test + void testFailedApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FAILED)) + .isEqualTo(ApplicationStatus.FAILED); + } + + @Test + void testCancelledApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.CANCELED)) + .isEqualTo(ApplicationStatus.CANCELED); + } + + @Test + void testSucceededApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FINISHED)) + .isEqualTo(ApplicationStatus.SUCCEEDED); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "FAILING", "CANCELING"}) + public void testUnknownApplicationStatusFromNonTerminalApplicationState( + ApplicationState applicationState) { + assertThat(ApplicationStatus.fromApplicationState(applicationState)) + .isEqualTo(ApplicationStatus.UNKNOWN); + } + private static Iterable<Integer> exitCodes(Iterable<ApplicationStatus> statuses) { return StreamSupport.stream(statuses.spliterator(), false) .map(ApplicationStatus::processExitCode) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java index 16c8d76d072..dfcd7065feb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.application.AbstractApplication; @@ -736,6 +737,122 @@ public class DispatcherApplicationTest { .hasCauseInstanceOf(DuplicateApplicationSubmissionException.class); } + @Test + public void testDuplicateSubmissionWithTerminatedButDirtyApplication() throws Exception { + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + + assertDuplicateApplicationSubmission(); + } + + @Test + public void testDuplicateSubmissionWithTerminatedAndCleanedApplication() throws Exception { + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get(); + + assertDuplicateApplicationSubmission(); + } + + private void assertDuplicateApplicationSubmission() throws Exception { + dispatcher = createTestingDispatcherBuilder().build(rpcService); + dispatcher.start(); + + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + final AbstractApplication application = + TestingApplication.builder().setApplicationId(applicationId).build(); + + final CompletableFuture<Acknowledge> submitFuture = + dispatcherGateway.submitApplication(application, TIMEOUT); + + assertThatThrownBy(submitFuture::get) + .hasCauseInstanceOf(DuplicateApplicationSubmissionException.class); + } + + @Test + public void testApplicationBootstrapWithDirtyResultTriggersShutdown() throws Exception { + testApplicationBootstrapWithApplicationResult(false, true); + } + + @Test + public void testApplicationBootstrapWithDirtyResultDoesNotTriggerShutdownWhenDisabled() + throws Exception { + testApplicationBootstrapWithApplicationResult(false, false); + } + + @Test + public void testApplicationBootstrapWithCleanResultTriggersShutdown() throws Exception { + testApplicationBootstrapWithApplicationResult(true, true); + } + + @Test + public void testApplicationBootstrapWithCleanResultDoesNotTriggerShutdownWhenDisabled() + throws Exception { + testApplicationBootstrapWithApplicationResult(true, false); + } + + private void testApplicationBootstrapWithApplicationResult( + boolean isCleanResult, boolean triggerShutDown) throws Exception { + configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, triggerShutDown); + + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + if (isCleanResult) { + haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get(); + } + + final OneShotLatch bootstrapLatch = new OneShotLatch(); + final TestingDispatcher.Builder builder = + createTestingDispatcherBuilder() + .setDispatcherBootstrapFactory( + (ignoredDispatcherGateway, + ignoredScheduledExecutor, + ignoredFatalErrorHandler) -> + new ApplicationBootstrap( + TestingApplication.builder() + .setApplicationId(applicationId) + .setExecuteFunction( + ignoredExecuteParams -> { + bootstrapLatch.trigger(); + return CompletableFuture + .completedFuture( + Acknowledge + .get()); + }) + .build())); + if (!isCleanResult) { + builder.setRecoveredDirtyApplications(Collections.singleton(applicationResult)); + } + dispatcher = builder.build(rpcService); + dispatcher.start(); + + if (triggerShutDown) { + assertEquals( + ApplicationStatus.SUCCEEDED, + dispatcher.getShutDownFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)); + } else { + assertThatThrownBy( + () -> dispatcher.getShutDownFuture().get(100L, TimeUnit.MILLISECONDS)) + .isInstanceOf(TimeoutException.class); + } + + assertFalse(bootstrapLatch.isTriggered()); + } + @Test public void testThatDirtilyFinishedApplicationsNotRetriggered() { final AbstractApplication application = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java index 803f69c7920..51aa19be5c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java @@ -208,4 +208,54 @@ public interface ApplicationResultStoreContractTest { .singleElement() .isEqualTo(otherDirtyApplicationResultEntry.getApplicationId()); } + + @Test + default void testGetCleanApplicationResultWithCleanEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNotNull(); + assertThat(result.getApplicationId()) + .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + assertThat(result.getApplicationState()) + .isEqualTo( + DUMMY_APPLICATION_RESULT_ENTRY + .getApplicationResult() + .getApplicationState()); + assertThat(result.getApplicationName()) + .isEqualTo( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationResult().getApplicationName()); + } + + @Test + default void testGetCleanApplicationResultWithNoEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } + + @Test + default void testGetCleanApplicationResultWithDirtyEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java index fb4206a0892..08b5bf139ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java @@ -21,11 +21,14 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.core.fs.Path; import org.apache.flink.util.concurrent.Executors; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import static org.assertj.core.api.Assertions.assertThat; + /** * Tests for the {@link FileSystemApplicationResultStore} implementation of the {@link * ApplicationResultStore}'s contracts. @@ -40,4 +43,24 @@ public class FileSystemApplicationResultStoreContractTest return new FileSystemApplicationResultStore( path.getFileSystem(), path, false, Executors.directExecutor()); } + + @Test + void testGetCleanApplicationResultWhenDeleteOnCommitIsTrue() throws Exception { + Path path = new Path(temporaryFolder.toURI()); + FileSystemApplicationResultStore applicationResultStore = + new FileSystemApplicationResultStore( + path.getFileSystem(), path, true, Executors.directExecutor()); + + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java index cf6dae44f0f..b0bfa5a440b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java @@ -66,6 +66,8 @@ public class TestingApplicationResultStore implements ApplicationResultStore { hasCleanApplicationResultEntryFunction; private final SupplierWithException<Set<ApplicationResult>, ? extends IOException> getDirtyResultsSupplier; + private final Function<ApplicationID, CompletableFuture<ApplicationResult>> + getCleanApplicationResultFunction; private TestingApplicationResultStore( Function<ApplicationResultEntry, CompletableFuture<Void>> createDirtyResultConsumer, @@ -76,13 +78,16 @@ public class TestingApplicationResultStore implements ApplicationResultStore { Function<ApplicationID, CompletableFuture<Boolean>> hasCleanApplicationResultEntryFunction, SupplierWithException<Set<ApplicationResult>, ? extends IOException> - getDirtyResultsSupplier) { + getDirtyResultsSupplier, + Function<ApplicationID, CompletableFuture<ApplicationResult>> + getCleanApplicationResultFunction) { this.createDirtyResultConsumer = createDirtyResultConsumer; this.markResultAsCleanConsumer = markResultAsCleanConsumer; this.hasApplicationResultEntryFunction = hasApplicationResultEntryFunction; this.hasDirtyApplicationResultEntryFunction = hasDirtyApplicationResultEntryFunction; this.hasCleanApplicationResultEntryFunction = hasCleanApplicationResultEntryFunction; this.getDirtyResultsSupplier = getDirtyResultsSupplier; + this.getCleanApplicationResultFunction = getCleanApplicationResultFunction; } @Override @@ -118,6 +123,12 @@ public class TestingApplicationResultStore implements ApplicationResultStore { return getDirtyResultsSupplier.get(); } + @Override + public CompletableFuture<ApplicationResult> getCleanApplicationResultAsync( + ApplicationID applicationId) { + return getCleanApplicationResultFunction.apply(applicationId); + } + public static TestingApplicationResultStore.Builder builder() { return new Builder(); } @@ -144,6 +155,12 @@ public class TestingApplicationResultStore implements ApplicationResultStore { private SupplierWithException<Set<ApplicationResult>, ? extends IOException> getDirtyResultsSupplier = Collections::emptySet; + private Function<ApplicationID, CompletableFuture<ApplicationResult>> + getCleanApplicationResultFunction = + applicationID -> + CompletableFuture.completedFuture( + createSuccessfulApplicationResult(applicationID)); + public Builder withCreateDirtyResultConsumer( Function<ApplicationResultEntry, CompletableFuture<Void>> createDirtyResultConsumer) { @@ -185,6 +202,13 @@ public class TestingApplicationResultStore implements ApplicationResultStore { return this; } + public Builder withGetCleanApplicationResultFunction( + Function<ApplicationID, CompletableFuture<ApplicationResult>> + getCleanApplicationResultFunction) { + this.getCleanApplicationResultFunction = getCleanApplicationResultFunction; + return this; + } + public TestingApplicationResultStore build() { return new TestingApplicationResultStore( createDirtyResultConsumer, @@ -192,7 +216,8 @@ public class TestingApplicationResultStore implements ApplicationResultStore { hasApplicationResultEntryFunction, hasDirtyApplicationResultEntryFunction, hasCleanApplicationResultEntryFunction, - getDirtyResultsSupplier); + getDirtyResultsSupplier, + getCleanApplicationResultFunction); } } }
