This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22598624741dbc785ef2bd38d9aef65b2c395932
Author: Yi Zhang <[email protected]>
AuthorDate: Wed Mar 18 15:49:44 2026 +0800

    [FLINK-39309][runtime] Check terminated applications for duplication
---
 .../clusterframework/ApplicationStatus.java        |  25 ++
 .../flink/runtime/dispatcher/Dispatcher.java       | 275 +++++++++++++--------
 .../AbstractThreadsafeApplicationResultStore.java  |  10 +
 .../highavailability/ApplicationResultStore.java   |  17 ++
 .../EmbeddedApplicationResultStore.java            |  10 +
 .../FileSystemApplicationResultStore.java          |  17 ++
 .../clusterframework/ApplicationStatusTest.java    |  29 +++
 .../dispatcher/DispatcherApplicationTest.java      | 117 +++++++++
 .../ApplicationResultStoreContractTest.java        |  50 ++++
 ...leSystemApplicationResultStoreContractTest.java |  23 ++
 .../testutils/TestingApplicationResultStore.java   |  29 ++-
 11 files changed, 501 insertions(+), 101 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
index f5037471274..94707eb6c7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.api.common.ApplicationState;
 import org.apache.flink.api.common.JobStatus;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.BiMap;
@@ -89,4 +90,28 @@ public enum ApplicationStatus {
 
         return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(this);
     }
+
+    /**
+     * Derives the ApplicationStatus that corresponds to the given 
ApplicationState. If the
+     * ApplicationState is not a terminal state, this method returns {@link 
#UNKNOWN}.
+     *
+     * <p>Note: {@code ApplicationState} covers the entire lifecycle of an 
application, representing
+     * various stages from created to finish. {@code ApplicationStatus}, on 
the other hand,
+     * describes the final state of the cluster when shutdown.
+     *
+     * @param applicationState the application state
+     * @return the corresponding status
+     */
+    public static ApplicationStatus fromApplicationState(ApplicationState 
applicationState) {
+        switch (applicationState) {
+            case FAILED:
+                return FAILED;
+            case CANCELED:
+                return CANCELED;
+            case FINISHED:
+                return SUCCEEDED;
+            default:
+                return UNKNOWN;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d5394704e32..6d53fa0e221 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.WebOptions;
@@ -483,52 +484,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                         this.getRpcService().getScheduledExecutor(),
                         this::onFatalError);
 
+        startApplicationsCleanup();
+
         if (dispatcherBootstrap instanceof ApplicationBootstrap) {
             // Application Mode
-            checkState(suspendedApplications.isEmpty());
-            checkState(recoveredDirtyApplicationResults.size() <= 1);
-
-            AbstractApplication application =
-                    ((ApplicationBootstrap) 
dispatcherBootstrap).getApplication();
-            if (!recoveredDirtyApplicationResults.isEmpty()) {
-                // the application is already terminated
-                ApplicationResult applicationResult =
-                        recoveredDirtyApplicationResults.iterator().next();
-                checkState(
-                        application
-                                .getApplicationId()
-                                .equals(applicationResult.getApplicationId()));
-
-                startApplicationCleanup();
-            } else {
-                // defer starting recovered jobs, as they might be skipped 
based on user logic
-                internalSubmitApplication(application).get();
-            }
+            maybeSubmitApplicationInApplicationMode();
         } else {
             // Session Mode
-            startApplicationCleanup();
-
-            // start suspended applications
-            for (AbstractApplication suspendedApplication : 
suspendedApplications.values()) {
-                // defer starting recovered jobs, as they might be skipped 
based on user logic
-                internalSubmitApplication(suspendedApplication).get();
-            }
-
-            // start suspended jobs that do not belong to any application 
(previously submitted in a
-            // SingleJobApplication) by wrapping them into a 
SingleJobApplication
-            Iterator<Map.Entry<JobID, ExecutionPlan>> jobIterator =
-                    suspendedJobs.entrySet().iterator();
-            while (jobIterator.hasNext()) {
-                Map.Entry<JobID, ExecutionPlan> entry = jobIterator.next();
-                ExecutionPlan recoveredJob = entry.getValue();
-                ApplicationID applicationId = 
recoveredJob.getApplicationId().orElse(null);
-
-                if (!suspendedApplications.containsKey(applicationId)) {
-                    runRecoveredJob(recoveredJob, true);
-                    jobIterator.remove();
-                    suspendedJobIdsByApplicationId.remove(applicationId);
-                }
-            }
+            recoverApplicationsAndJobsInSessionMode();
         }
 
         checkState(recoveredDirtyJobResultsByApplicationId.isEmpty());
@@ -673,49 +636,147 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         }
     }
 
-    private void startApplicationCleanup() {
+    private void startApplicationsCleanup() {
         for (ApplicationResult applicationResult : 
recoveredDirtyApplicationResults) {
-            ApplicationID applicationId = applicationResult.getApplicationId();
-            ApplicationState applicationState = 
applicationResult.getApplicationState();
-
-            Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>();
-            Collection<JobResult> dirtyJobResults =
-                    
recoveredDirtyJobResultsByApplicationId.remove(applicationId);
-            if (dirtyJobResults != null) {
-                for (JobResult jobResult : dirtyJobResults) {
-                    JobID jobId = jobResult.getJobId();
-                    ExecutionGraphInfo executionGraphInfo =
-                            new ExecutionGraphInfo(
-                                    
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                                            jobResult, -1));
-                    jobs.put(jobId, executionGraphInfo);
-
-                    runJobWithCleanupRunner(jobResult, false);
-                }
+            try {
+                startApplicationCleanup(applicationResult);
+            } catch (Throwable throwable) {
+                onFatalError(
+                        new DispatcherException(
+                                String.format(
+                                        "Could not start cleanup for 
application %s.",
+                                        applicationResult.getApplicationId()),
+                                throwable));
             }
+        }
+    }
 
-            long[] stateTimestamps = new 
long[ApplicationState.values().length];
-            stateTimestamps[ApplicationState.CREATED.ordinal()] = 
applicationResult.getStartTime();
-            stateTimestamps[applicationState.ordinal()] = 
applicationResult.getEndTime();
+    private void startApplicationCleanup(ApplicationResult applicationResult) {
+        ApplicationID applicationId = applicationResult.getApplicationId();
+        ApplicationState applicationState = 
applicationResult.getApplicationState();
 
-            ArchivedApplication sparseArchivedApplication =
-                    new ArchivedApplication(
-                            applicationId,
-                            applicationResult.getApplicationName(),
-                            applicationState,
-                            stateTimestamps,
-                            jobs,
-                            Collections.emptyList());
+        Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>();
+        Collection<JobResult> dirtyJobResults =
+                recoveredDirtyJobResultsByApplicationId.remove(applicationId);
+        if (dirtyJobResults != null) {
+            for (JobResult jobResult : dirtyJobResults) {
+                JobID jobId = jobResult.getJobId();
+                ExecutionGraphInfo executionGraphInfo =
+                        new ExecutionGraphInfo(
+                                
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+                                        jobResult, -1));
+                jobs.put(jobId, executionGraphInfo);
+
+                runJobWithCleanupRunner(jobResult, false);
+            }
+        }
 
-            writeToArchivedApplicationStore(sparseArchivedApplication);
+        long[] stateTimestamps = new long[ApplicationState.values().length];
+        stateTimestamps[ApplicationState.CREATED.ordinal()] = 
applicationResult.getStartTime();
+        stateTimestamps[applicationState.ordinal()] = 
applicationResult.getEndTime();
+
+        ArchivedApplication sparseArchivedApplication =
+                new ArchivedApplication(
+                        applicationId,
+                        applicationResult.getApplicationName(),
+                        applicationState,
+                        stateTimestamps,
+                        jobs,
+                        Collections.emptyList());
+
+        writeToArchivedApplicationStore(sparseArchivedApplication);
+
+        // the dirty result already exists
+        // create a completed future to make sure the jobs can be marked clean
+        applicationCreateDirtyResultFutures.put(applicationId, 
FutureUtils.completedVoidFuture());
+        applicationTerminationFutures.put(applicationId, new 
CompletableFuture<>());
 
-            // the dirty result already exists
-            // create a completed future to make sure the jobs can be marked 
clean
-            applicationCreateDirtyResultFutures.put(
-                    applicationId, FutureUtils.completedVoidFuture());
-            applicationTerminationFutures.put(applicationId, new 
CompletableFuture<>());
+        removeApplication(applicationId, jobs.keySet());
+    }
 
-            removeApplication(applicationId, jobs.keySet());
+    private void recoverApplicationsAndJobsInSessionMode() {
+        for (AbstractApplication suspendedApplication : 
suspendedApplications.values()) {
+            // defer starting recovered jobs, as they might be skipped based 
on user logic
+            try {
+                internalSubmitApplication(suspendedApplication).get();
+            } catch (Throwable throwable) {
+                onFatalError(
+                        new DispatcherException(
+                                String.format(
+                                        "Could not start recovered application 
%s.",
+                                        
suspendedApplication.getApplicationId()),
+                                throwable));
+                return;
+            }
+        }
+
+        // start suspended jobs that do not belong to any application 
(previously submitted in a
+        // SingleJobApplication) by wrapping them into a SingleJobApplication
+        Iterator<Map.Entry<JobID, ExecutionPlan>> jobIterator = 
suspendedJobs.entrySet().iterator();
+        while (jobIterator.hasNext()) {
+            Map.Entry<JobID, ExecutionPlan> entry = jobIterator.next();
+            ExecutionPlan recoveredJob = entry.getValue();
+            ApplicationID applicationId = 
recoveredJob.getApplicationId().orElse(null);
+            if (!suspendedApplications.containsKey(applicationId)) {
+                runRecoveredJob(recoveredJob, true);
+                jobIterator.remove();
+                suspendedJobIdsByApplicationId.remove(applicationId);
+            }
+        }
+    }
+
+    private void maybeSubmitApplicationInApplicationMode() {
+        checkState(suspendedApplications.isEmpty());
+        checkState(recoveredDirtyApplicationResults.size() <= 1);
+
+        AbstractApplication application =
+                ((ApplicationBootstrap) dispatcherBootstrap).getApplication();
+        ApplicationID applicationId = application.getApplicationId();
+        boolean shutDownOnApplicationFinish =
+                
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
+        if (!recoveredDirtyApplicationResults.isEmpty()) {
+            // the application is already terminated but needs to be cleaned up
+            ApplicationResult applicationResult =
+                    recoveredDirtyApplicationResults.iterator().next();
+            
checkState(applicationId.equals(applicationResult.getApplicationId()));
+
+            if (shutDownOnApplicationFinish) {
+                shutDownCluster(
+                        ApplicationStatus.fromApplicationState(
+                                applicationResult.getApplicationState()));
+            }
+        } else {
+            // check whether the application was submitted and is already 
cleaned up
+            ApplicationResult applicationResult = null;
+            try {
+                applicationResult =
+                        
applicationResultStore.getCleanApplicationResultAsync(applicationId).get();
+            } catch (Throwable throwable) {
+                onFatalError(
+                        new DispatcherException(
+                                String.format(
+                                        "Could not get clean application 
result for application %s.",
+                                        applicationId),
+                                throwable));
+                return;
+            }
+
+            if (applicationResult == null) {
+                try {
+                    internalSubmitApplication(application).get();
+                } catch (Throwable throwable) {
+                    onFatalError(
+                            new DispatcherException(
+                                    String.format("Could not start application 
%s.", applicationId),
+                                    throwable));
+                }
+            } else {
+                if (shutDownOnApplicationFinish) {
+                    shutDownCluster(
+                            ApplicationStatus.fromApplicationState(
+                                    applicationResult.getApplicationState()));
+                }
+            }
         }
     }
 
@@ -873,29 +934,45 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         final ApplicationID applicationId = application.getApplicationId();
         log.info(
                 "Received application submission '{}' ({}).", 
application.getName(), applicationId);
+        return applicationResultStore
+                .hasApplicationResultEntryAsync(applicationId)
+                .thenComposeAsync(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring application submission '{}' 
({}) because the application already "
+                                                + "reached a terminal state.",
+                                        application.getName(),
+                                        applicationId);
+                                return FutureUtils.completedExceptionally(
+                                        new 
DuplicateApplicationSubmissionException(applicationId));
+                            } else if (applications.containsKey(applicationId)
+                                    || 
archivedApplicationStore.get(applicationId).isPresent()) {
+                                log.warn("Application with id {} already 
exists.", applicationId);
+                                return FutureUtils.completedExceptionally(
+                                        new 
DuplicateApplicationSubmissionException(applicationId));
+                            }
 
-        if (applications.containsKey(applicationId)) {
-            log.warn("Application with id {} already exists.", applicationId);
-            throw new CompletionException(
-                    new 
DuplicateApplicationSubmissionException(applicationId));
-        }
-
-        Optional<ApplicationStoreEntry> optionalApplicationStoreEntry =
-                application.getApplicationStoreEntry();
-        if (optionalApplicationStoreEntry.isPresent()) {
-            try {
-                
applicationWriter.putApplication(optionalApplicationStoreEntry.get());
-            } catch (Exception e) {
-                String msg =
-                        String.format(
-                                "Could not persist application %s to the 
ApplicationStore.",
-                                applicationId);
-                log.warn(msg);
-                throw new CompletionException(new RuntimeException(msg, e));
-            }
-        }
+                            Optional<ApplicationStoreEntry> 
optionalApplicationStoreEntry =
+                                    application.getApplicationStoreEntry();
+                            if (optionalApplicationStoreEntry.isPresent()) {
+                                try {
+                                    applicationWriter.putApplication(
+                                            
optionalApplicationStoreEntry.get());
+                                } catch (Exception e) {
+                                    String msg =
+                                            String.format(
+                                                    "Could not persist 
application %s to the ApplicationStore.",
+                                                    applicationId);
+                                    log.warn(msg, e);
+                                    return FutureUtils.completedExceptionally(
+                                            new RuntimeException(msg, e));
+                                }
+                            }
 
-        return internalSubmitApplication(application);
+                            return internalSubmitApplication(application);
+                        },
+                        getMainThreadExecutor());
     }
 
     /** This method must be called from the main thread. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
index 5fbdacff807..4f539dd7c16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
@@ -128,6 +128,16 @@ public abstract class 
AbstractThreadsafeApplicationResultStore implements Applic
     @GuardedBy("readWriteLock")
     protected abstract Set<ApplicationResult> getDirtyResultsInternal() throws 
IOException;
 
+    @Override
+    public CompletableFuture<ApplicationResult> getCleanApplicationResultAsync(
+            ApplicationID applicationId) {
+        return withReadLockAsync(() -> 
getCleanApplicationResultInternal(applicationId));
+    }
+
+    @GuardedBy("readWriteLock")
+    protected abstract ApplicationResult getCleanApplicationResultInternal(
+            ApplicationID applicationId) throws IOException;
+
     private CompletableFuture<Void> 
withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
         return FutureUtils.runAsync(
                 () -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
index 5606e8e8169..8e48b84d0e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
@@ -109,4 +109,21 @@ public interface ApplicationResultStore {
      * @throws IOException if collecting the set of dirty results failed for 
IO reasons.
      */
     Set<ApplicationResult> getDirtyResults() throws IOException;
+
+    /**
+     * Asynchronously gets the persisted {@link ApplicationResult} instance 
that is marked as {@code
+     * clean} for the given {@code ApplicationID}.
+     *
+     * <p>This method is used to determine whether the application has already 
completed and been
+     * cleaned up, thereby avoiding duplicate execution. Note that it only 
works when {@link
+     * ApplicationResultStoreOptions#DELETE_ON_COMMIT} is set to {@code 
false}; otherwise clean
+     * entries are deleted upon commit and cannot be retrieved.
+     *
+     * @param applicationId Ident of the application we wish to get.
+     * @return a {@link CompletableFuture} that completes with the {@link 
ApplicationResult} if a
+     *     clean entry exists for the given {@code applicationId}; otherwise a 
successfully
+     *     completed future with {@code null}.
+     */
+    CompletableFuture<ApplicationResult> getCleanApplicationResultAsync(
+            ApplicationID applicationId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
index af987d6dba3..3ac50ebb352 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
@@ -75,6 +75,16 @@ public class EmbeddedApplicationResultStore extends 
AbstractThreadsafeApplicatio
                 .collect(Collectors.toSet());
     }
 
+    @Override
+    protected ApplicationResult 
getCleanApplicationResultInternal(ApplicationID applicationId) {
+        final ApplicationResultEntry entry = cleanResults.get(applicationId);
+        if (entry == null) {
+            return null;
+        }
+
+        return entry.getApplicationResult();
+    }
+
     /** Clears all stored results. */
     public void clear() {
         dirtyResults.clear();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
index fd938fede9b..b4832110fad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
@@ -230,6 +230,23 @@ public class FileSystemApplicationResultStore extends 
AbstractThreadsafeApplicat
         return dirtyResults;
     }
 
+    @Override
+    protected ApplicationResult 
getCleanApplicationResultInternal(ApplicationID applicationId)
+            throws IOException {
+        if (deleteOnCommit) {
+            return null;
+        }
+
+        Path cleanPath = constructCleanPath(applicationId);
+        if (!fileSystem.exists(cleanPath)) {
+            return null;
+        }
+
+        JsonApplicationResultEntry jre =
+                mapper.readValue(fileSystem.open(cleanPath), 
JsonApplicationResultEntry.class);
+        return jre.getApplicationResult();
+    }
+
     /**
      * Wrapper class around {@link ApplicationResultEntry} to allow for 
serialization of a schema
      * version, so that future schema changes can be handled in a backwards 
compatible manner.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
index 507c0a4fe26..7e4620b67a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.api.common.ApplicationState;
 import org.apache.flink.api.common.JobStatus;
 
 import org.junit.jupiter.api.Test;
@@ -115,6 +116,34 @@ class ApplicationStatusTest {
         
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
     }
 
+    @Test
+    void testFailedApplicationStatusFromApplicationState() {
+        
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FAILED))
+                .isEqualTo(ApplicationStatus.FAILED);
+    }
+
+    @Test
+    void testCancelledApplicationStatusFromApplicationState() {
+        
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.CANCELED))
+                .isEqualTo(ApplicationStatus.CANCELED);
+    }
+
+    @Test
+    void testSucceededApplicationStatusFromApplicationState() {
+        
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FINISHED))
+                .isEqualTo(ApplicationStatus.SUCCEEDED);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ApplicationState.class,
+            names = {"CREATED", "RUNNING", "FAILING", "CANCELING"})
+    public void testUnknownApplicationStatusFromNonTerminalApplicationState(
+            ApplicationState applicationState) {
+        assertThat(ApplicationStatus.fromApplicationState(applicationState))
+                .isEqualTo(ApplicationStatus.UNKNOWN);
+    }
+
     private static Iterable<Integer> exitCodes(Iterable<ApplicationStatus> 
statuses) {
         return StreamSupport.stream(statuses.spliterator(), false)
                 .map(ApplicationStatus::processExitCode)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
index 16c8d76d072..dfcd7065feb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ApplicationState;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.application.AbstractApplication;
@@ -736,6 +737,122 @@ public class DispatcherApplicationTest {
                 
.hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
     }
 
+    @Test
+    public void testDuplicateSubmissionWithTerminatedButDirtyApplication() 
throws Exception {
+        final ApplicationResult applicationResult =
+                
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+        haServices
+                .getApplicationResultStore()
+                .createDirtyResultAsync(new 
ApplicationResultEntry(applicationResult))
+                .get();
+
+        assertDuplicateApplicationSubmission();
+    }
+
+    @Test
+    public void testDuplicateSubmissionWithTerminatedAndCleanedApplication() 
throws Exception {
+        final ApplicationResult applicationResult =
+                
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+        haServices
+                .getApplicationResultStore()
+                .createDirtyResultAsync(new 
ApplicationResultEntry(applicationResult))
+                .get();
+        
haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();
+
+        assertDuplicateApplicationSubmission();
+    }
+
+    private void assertDuplicateApplicationSubmission() throws Exception {
+        dispatcher = createTestingDispatcherBuilder().build(rpcService);
+        dispatcher.start();
+
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        final AbstractApplication application =
+                
TestingApplication.builder().setApplicationId(applicationId).build();
+
+        final CompletableFuture<Acknowledge> submitFuture =
+                dispatcherGateway.submitApplication(application, TIMEOUT);
+
+        assertThatThrownBy(submitFuture::get)
+                
.hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
+    }
+
+    @Test
+    public void testApplicationBootstrapWithDirtyResultTriggersShutdown() 
throws Exception {
+        testApplicationBootstrapWithApplicationResult(false, true);
+    }
+
+    @Test
+    public void 
testApplicationBootstrapWithDirtyResultDoesNotTriggerShutdownWhenDisabled()
+            throws Exception {
+        testApplicationBootstrapWithApplicationResult(false, false);
+    }
+
+    @Test
+    public void testApplicationBootstrapWithCleanResultTriggersShutdown() 
throws Exception {
+        testApplicationBootstrapWithApplicationResult(true, true);
+    }
+
+    @Test
+    public void 
testApplicationBootstrapWithCleanResultDoesNotTriggerShutdownWhenDisabled()
+            throws Exception {
+        testApplicationBootstrapWithApplicationResult(true, false);
+    }
+
+    private void testApplicationBootstrapWithApplicationResult(
+            boolean isCleanResult, boolean triggerShutDown) throws Exception {
+        configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, 
triggerShutDown);
+
+        final ApplicationResult applicationResult =
+                
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+        haServices
+                .getApplicationResultStore()
+                .createDirtyResultAsync(new 
ApplicationResultEntry(applicationResult))
+                .get();
+        if (isCleanResult) {
+            
haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();
+        }
+
+        final OneShotLatch bootstrapLatch = new OneShotLatch();
+        final TestingDispatcher.Builder builder =
+                createTestingDispatcherBuilder()
+                        .setDispatcherBootstrapFactory(
+                                (ignoredDispatcherGateway,
+                                        ignoredScheduledExecutor,
+                                        ignoredFatalErrorHandler) ->
+                                        new ApplicationBootstrap(
+                                                TestingApplication.builder()
+                                                        
.setApplicationId(applicationId)
+                                                        .setExecuteFunction(
+                                                                
ignoredExecuteParams -> {
+                                                                    
bootstrapLatch.trigger();
+                                                                    return 
CompletableFuture
+                                                                            
.completedFuture(
+                                                                               
     Acknowledge
+                                                                               
             .get());
+                                                                })
+                                                        .build()));
+        if (!isCleanResult) {
+            
builder.setRecoveredDirtyApplications(Collections.singleton(applicationResult));
+        }
+        dispatcher = builder.build(rpcService);
+        dispatcher.start();
+
+        if (triggerShutDown) {
+            assertEquals(
+                    ApplicationStatus.SUCCEEDED,
+                    dispatcher.getShutDownFuture().get(TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS));
+        } else {
+            assertThatThrownBy(
+                            () -> dispatcher.getShutDownFuture().get(100L, 
TimeUnit.MILLISECONDS))
+                    .isInstanceOf(TimeoutException.class);
+        }
+
+        assertFalse(bootstrapLatch.isTriggered());
+    }
+
     @Test
     public void testThatDirtilyFinishedApplicationsNotRetriggered() {
         final AbstractApplication application =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
index 803f69c7920..51aa19be5c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
@@ -208,4 +208,54 @@ public interface ApplicationResultStoreContractTest {
                 .singleElement()
                 
.isEqualTo(otherDirtyApplicationResultEntry.getApplicationId());
     }
+
+    @Test
+    default void testGetCleanApplicationResultWithCleanEntry() throws 
IOException {
+        ApplicationResultStore applicationResultStore = 
createApplicationResultStore();
+        
applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+        applicationResultStore
+                
.markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                .join();
+
+        final ApplicationResult result =
+                applicationResultStore
+                        .getCleanApplicationResultAsync(
+                                
DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                        .join();
+        assertThat(result).isNotNull();
+        assertThat(result.getApplicationId())
+                .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId());
+        assertThat(result.getApplicationState())
+                .isEqualTo(
+                        DUMMY_APPLICATION_RESULT_ENTRY
+                                .getApplicationResult()
+                                .getApplicationState());
+        assertThat(result.getApplicationName())
+                .isEqualTo(
+                        
DUMMY_APPLICATION_RESULT_ENTRY.getApplicationResult().getApplicationName());
+    }
+
+    @Test
+    default void testGetCleanApplicationResultWithNoEntry() throws IOException 
{
+        ApplicationResultStore applicationResultStore = 
createApplicationResultStore();
+        final ApplicationResult result =
+                applicationResultStore
+                        .getCleanApplicationResultAsync(
+                                
DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                        .join();
+        assertThat(result).isNull();
+    }
+
+    @Test
+    default void testGetCleanApplicationResultWithDirtyEntry() throws 
IOException {
+        ApplicationResultStore applicationResultStore = 
createApplicationResultStore();
+        
applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+
+        final ApplicationResult result =
+                applicationResultStore
+                        .getCleanApplicationResultAsync(
+                                
DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                        .join();
+        assertThat(result).isNull();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
index fb4206a0892..08b5bf139ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Tests for the {@link FileSystemApplicationResultStore} implementation of 
the {@link
  * ApplicationResultStore}'s contracts.
@@ -40,4 +43,24 @@ public class FileSystemApplicationResultStoreContractTest
         return new FileSystemApplicationResultStore(
                 path.getFileSystem(), path, false, Executors.directExecutor());
     }
+
+    @Test
+    void testGetCleanApplicationResultWhenDeleteOnCommitIsTrue() throws 
Exception {
+        Path path = new Path(temporaryFolder.toURI());
+        FileSystemApplicationResultStore applicationResultStore =
+                new FileSystemApplicationResultStore(
+                        path.getFileSystem(), path, true, 
Executors.directExecutor());
+
+        
applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+        applicationResultStore
+                
.markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                .join();
+
+        final ApplicationResult result =
+                applicationResultStore
+                        .getCleanApplicationResultAsync(
+                                
DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+                        .join();
+        assertThat(result).isNull();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
index cf6dae44f0f..b0bfa5a440b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
@@ -66,6 +66,8 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
             hasCleanApplicationResultEntryFunction;
     private final SupplierWithException<Set<ApplicationResult>, ? extends 
IOException>
             getDirtyResultsSupplier;
+    private final Function<ApplicationID, CompletableFuture<ApplicationResult>>
+            getCleanApplicationResultFunction;
 
     private TestingApplicationResultStore(
             Function<ApplicationResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer,
@@ -76,13 +78,16 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
             Function<ApplicationID, CompletableFuture<Boolean>>
                     hasCleanApplicationResultEntryFunction,
             SupplierWithException<Set<ApplicationResult>, ? extends 
IOException>
-                    getDirtyResultsSupplier) {
+                    getDirtyResultsSupplier,
+            Function<ApplicationID, CompletableFuture<ApplicationResult>>
+                    getCleanApplicationResultFunction) {
         this.createDirtyResultConsumer = createDirtyResultConsumer;
         this.markResultAsCleanConsumer = markResultAsCleanConsumer;
         this.hasApplicationResultEntryFunction = 
hasApplicationResultEntryFunction;
         this.hasDirtyApplicationResultEntryFunction = 
hasDirtyApplicationResultEntryFunction;
         this.hasCleanApplicationResultEntryFunction = 
hasCleanApplicationResultEntryFunction;
         this.getDirtyResultsSupplier = getDirtyResultsSupplier;
+        this.getCleanApplicationResultFunction = 
getCleanApplicationResultFunction;
     }
 
     @Override
@@ -118,6 +123,12 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
         return getDirtyResultsSupplier.get();
     }
 
+    @Override
+    public CompletableFuture<ApplicationResult> getCleanApplicationResultAsync(
+            ApplicationID applicationId) {
+        return getCleanApplicationResultFunction.apply(applicationId);
+    }
+
     public static TestingApplicationResultStore.Builder builder() {
         return new Builder();
     }
@@ -144,6 +155,12 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
         private SupplierWithException<Set<ApplicationResult>, ? extends 
IOException>
                 getDirtyResultsSupplier = Collections::emptySet;
 
+        private Function<ApplicationID, CompletableFuture<ApplicationResult>>
+                getCleanApplicationResultFunction =
+                        applicationID ->
+                                CompletableFuture.completedFuture(
+                                        
createSuccessfulApplicationResult(applicationID));
+
         public Builder withCreateDirtyResultConsumer(
                 Function<ApplicationResultEntry, CompletableFuture<Void>>
                         createDirtyResultConsumer) {
@@ -185,6 +202,13 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
             return this;
         }
 
+        public Builder withGetCleanApplicationResultFunction(
+                Function<ApplicationID, CompletableFuture<ApplicationResult>>
+                        getCleanApplicationResultFunction) {
+            this.getCleanApplicationResultFunction = 
getCleanApplicationResultFunction;
+            return this;
+        }
+
         public TestingApplicationResultStore build() {
             return new TestingApplicationResultStore(
                     createDirtyResultConsumer,
@@ -192,7 +216,8 @@ public class TestingApplicationResultStore implements 
ApplicationResultStore {
                     hasApplicationResultEntryFunction,
                     hasDirtyApplicationResultEntryFunction,
                     hasCleanApplicationResultEntryFunction,
-                    getDirtyResultsSupplier);
+                    getDirtyResultsSupplier,
+                    getCleanApplicationResultFunction);
         }
     }
 }


Reply via email to