This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4cb44c0269918e5929fe448bfc8435765810de45
Author: Yi Zhang <[email protected]>
AuthorDate: Wed Sep 24 14:54:46 2025 +0800

    [FLINK-38758][runtime] Introduce PackagedProgramApplication
---
 .../generated/deployment_configuration.html        |    6 +
 .../application/PackagedProgramApplication.java    |  638 ++++++++++
 .../flink/client/program/PackagedProgram.java      |   16 +
 .../client/program/PackagedProgramDescriptor.java  |  101 ++
 .../PackagedProgramApplicationTest.java            | 1344 ++++++++++++++++++++
 .../program/PackagedProgramDescriptorTest.java     |   71 ++
 .../apache/flink/api/common/ApplicationState.java  |   27 +
 .../flink/configuration/DeploymentOptions.java     |    9 +
 8 files changed, 2212 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html 
b/docs/layouts/shortcodes/generated/deployment_configuration.html
index bfae1eb1866..e6790e81506 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -62,5 +62,11 @@
             <td>String</td>
             <td>The deployment target for the execution. This can take one of 
the following values when calling <code class="highlighter-rouge">bin/flink 
run</code>:<ul><li>remote</li><li>local</li><li>yarn-application</li><li>yarn-session</li><li>kubernetes-application</li><li>kubernetes-session</li></ul></td>
         </tr>
+        <tr>
+            
<td><h5>execution.terminate-application-on-any-job-terminated-exceptionally</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When it is set to true, the application will complete 
exceptionally if any job fails or is canceled. When it is set to false, the 
application will finish after all jobs reach terminal states.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
new file mode 100644
index 00000000000..5cc2f2cc047
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
+import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link AbstractApplication} designed for executing the 
user's {@code
+ * main()}.
+ */
+public class PackagedProgramApplication extends AbstractApplication {
+
+    @VisibleForTesting static final String FAILED_JOB_NAME = "(application 
driver)";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PackagedProgramApplication.class);
+
+    private final PackagedProgramDescriptor programDescriptor;
+
+    private final Collection<JobID> recoveredJobIds;
+
+    private final Configuration configuration;
+
+    private final boolean handleFatalError;
+
+    private final boolean enforceSingleJobExecution;
+
+    private final boolean submitFailedJobOnApplicationError;
+
+    private final boolean shutDownOnFinish;
+
+    private transient PackagedProgram program;
+
+    private transient CompletableFuture<Void> applicationCompletionFuture;
+
+    private transient ScheduledFuture<?> applicationExecutionTask;
+
+    private transient CompletableFuture<Acknowledge> finishApplicationFuture;
+
+    private transient boolean isDisposing = false;
+
+    public PackagedProgramApplication(
+            final ApplicationID applicationId,
+            final PackagedProgram program,
+            final Collection<JobID> recoveredJobIds,
+            final Configuration configuration,
+            final boolean handleFatalError,
+            final boolean enforceSingleJobExecution,
+            final boolean submitFailedJobOnApplicationError,
+            final boolean shutDownOnFinish) {
+        super(applicationId);
+        this.program = checkNotNull(program);
+        this.recoveredJobIds = checkNotNull(recoveredJobIds);
+        this.configuration = checkNotNull(configuration);
+        this.handleFatalError = handleFatalError;
+        this.enforceSingleJobExecution = enforceSingleJobExecution;
+        this.submitFailedJobOnApplicationError = 
submitFailedJobOnApplicationError;
+        this.shutDownOnFinish = shutDownOnFinish;
+        this.programDescriptor = program.getDescriptor();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> execute(
+            DispatcherGateway dispatcherGateway,
+            ScheduledExecutor scheduledExecutor,
+            Executor mainThreadExecutor,
+            FatalErrorHandler errorHandler) {
+        transitionToRunning();
+
+        final CompletableFuture<List<JobID>> applicationExecutionFuture = new 
CompletableFuture<>();
+        final Set<JobID> tolerateMissingResult = 
Collections.synchronizedSet(new HashSet<>());
+
+        // we need to hand in a future as return value because we need to get 
those JobIs out
+        // from the scheduled task that executes the user program
+        applicationExecutionTask =
+                scheduledExecutor.schedule(
+                        () ->
+                                runApplicationEntryPoint(
+                                        applicationExecutionFuture,
+                                        tolerateMissingResult,
+                                        dispatcherGateway,
+                                        scheduledExecutor),
+                        0L,
+                        TimeUnit.MILLISECONDS);
+
+        boolean decoupleApplicationStatusFromJobStatus =
+                
!configuration.get(DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION);
+
+        if (decoupleApplicationStatusFromJobStatus) {
+            // when the application status is decoupled from the job status, 
we don't need to wait
+            // for the job results
+            applicationCompletionFuture = 
applicationExecutionFuture.thenApply(ignored -> null);
+            finishApplicationFuture =
+                    applicationCompletionFuture
+                            .handleAsync(
+                                    (ignored, t) -> {
+                                        if (t == null) {
+                                            LOG.info(
+                                                    "Application completed 
SUCCESSFULLY (decoupled from job results)");
+                                            return finishAsSucceeded(
+                                                    dispatcherGateway,
+                                                    scheduledExecutor,
+                                                    mainThreadExecutor,
+                                                    errorHandler);
+                                        }
+
+                                        return onApplicationCanceledOrFailed(
+                                                dispatcherGateway,
+                                                scheduledExecutor,
+                                                mainThreadExecutor,
+                                                errorHandler,
+                                                t);
+                                    },
+                                    mainThreadExecutor)
+                            .thenCompose(Function.identity());
+        } else {
+            applicationCompletionFuture =
+                    applicationExecutionFuture.thenCompose(
+                            jobIds ->
+                                    waitForJobResults(
+                                            dispatcherGateway,
+                                            jobIds,
+                                            tolerateMissingResult,
+                                            scheduledExecutor));
+
+            finishApplicationFuture =
+                    applicationCompletionFuture
+                            .handleAsync(
+                                    (ignored, t) -> {
+                                        if (t == null) {
+                                            LOG.info("Application completed 
SUCCESSFULLY");
+                                            transitionToFinished();
+                                            return maybeShutdownCluster(
+                                                    dispatcherGateway, 
ApplicationStatus.SUCCEEDED);
+                                        }
+
+                                        final Optional<JobStatus> 
maybeJobStatus =
+                                                extractJobStatus(t);
+                                        if (maybeJobStatus.isPresent()) {
+                                            // the exception is caused by job 
execution results
+                                            ApplicationState applicationState =
+                                                    
ApplicationState.fromJobStatus(
+                                                            
maybeJobStatus.get());
+                                            LOG.info("Application {}: ", 
applicationState, t);
+                                            if (applicationState == 
ApplicationState.CANCELED) {
+                                                transitionToCanceling();
+                                                return finishAsCanceled(
+                                                        dispatcherGateway,
+                                                        scheduledExecutor,
+                                                        mainThreadExecutor,
+                                                        errorHandler);
+
+                                            } else {
+                                                transitionToFailing();
+                                                return finishAsFailed(
+                                                        dispatcherGateway,
+                                                        scheduledExecutor,
+                                                        mainThreadExecutor,
+                                                        errorHandler);
+                                            }
+                                        }
+
+                                        return onApplicationCanceledOrFailed(
+                                                dispatcherGateway,
+                                                scheduledExecutor,
+                                                mainThreadExecutor,
+                                                errorHandler,
+                                                t);
+                                    },
+                                    mainThreadExecutor)
+                            .thenCompose(Function.identity());
+        }
+
+        // In Application Mode, the handleFatalError flag is set to true, and 
uncaught exceptions
+        // are handled by the errorHandler to trigger failover.
+        // In Session Mode, the handleFatalError flag may be set to false, 
leaving exceptions
+        // unhandled. This behavior may change in the future.
+        FutureUtils.handleUncaughtException(
+                finishApplicationFuture,
+                (t, e) -> {
+                    if (handleFatalError) {
+                        errorHandler.onFatalError(e);
+                    }
+                });
+
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
+    @Override
+    public void cancel() {
+        ApplicationState currentState = getApplicationStatus();
+        if (currentState == ApplicationState.CREATED) {
+            // nothing to cancel
+            transitionToCanceling();
+            transitionToCanceled();
+        } else if (currentState == ApplicationState.RUNNING) {
+            transitionToCanceling();
+            cancelFutures();
+        }
+    }
+
+    @Override
+    public void dispose() {
+        isDisposing = true;
+        cancelFutures();
+    }
+
+    private void cancelFutures() {
+        if (applicationExecutionTask != null) {
+            applicationExecutionTask.cancel(true);
+        }
+
+        if (applicationCompletionFuture != null) {
+            // applicationCompletionFuture.handleAsync will not block here
+            applicationCompletionFuture.cancel(true);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return programDescriptor.getMainClassName();
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getApplicationExecutionFuture() {
+        return applicationExecutionTask;
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Void> getApplicationCompletionFuture() {
+        return applicationCompletionFuture;
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Acknowledge> getFinishApplicationFuture() {
+        return finishApplicationFuture;
+    }
+
+    private CompletableFuture<Acknowledge> onApplicationCanceledOrFailed(
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final Executor mainThreadExecutor,
+            final FatalErrorHandler errorHandler,
+            final Throwable t) {
+        if (t instanceof CancellationException) {
+            // the applicationCompletionFuture is canceled by cancel() or 
dispose()
+            if (isDisposing) {
+                LOG.warn("Application execution is canceled because the 
cluster is shutting down.");
+                // we don't need to do anything here during cleanup
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            }
+            LOG.info("Application execution is canceled manually.");
+            return finishAsCanceled(
+                    dispatcherGateway, scheduledExecutor, mainThreadExecutor, 
errorHandler);
+        }
+
+        LOG.warn("Application failed unexpectedly: ", t);
+        transitionToFailing();
+        return finishAsFailed(
+                dispatcherGateway, scheduledExecutor, mainThreadExecutor, 
errorHandler);
+    }
+
+    private CompletableFuture<Acknowledge> finishAsCanceled(
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final Executor mainThreadExecutor,
+            final FatalErrorHandler errorHandler) {
+        return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, 
scheduledExecutor)
+                .handleAsync(
+                        (v, t) -> {
+                            if (t != null) {
+                                return 
onCancelAllJobsAndWaitForTerminalStatesException(
+                                        errorHandler, t);
+                            }
+                            transitionToCanceled();
+                            return maybeShutdownCluster(
+                                    dispatcherGateway, 
ApplicationStatus.CANCELED);
+                        },
+                        mainThreadExecutor)
+                .thenCompose(Function.identity());
+    }
+
+    private CompletableFuture<Acknowledge> finishAsFailed(
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final Executor mainThreadExecutor,
+            final FatalErrorHandler errorHandler) {
+        return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, 
scheduledExecutor)
+                .handleAsync(
+                        (v, t) -> {
+                            if (t != null) {
+                                return 
onCancelAllJobsAndWaitForTerminalStatesException(
+                                        errorHandler, t);
+                            }
+                            transitionToFailed();
+                            return maybeShutdownCluster(
+                                    dispatcherGateway, 
ApplicationStatus.FAILED);
+                        },
+                        mainThreadExecutor)
+                .thenCompose(Function.identity());
+    }
+
+    private CompletableFuture<Acknowledge> finishAsSucceeded(
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final Executor mainThreadExecutor,
+            final FatalErrorHandler errorHandler) {
+        return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, 
scheduledExecutor)
+                .handleAsync(
+                        (v, t) -> {
+                            if (t != null) {
+                                return 
onCancelAllJobsAndWaitForTerminalStatesException(
+                                        errorHandler, t);
+                            }
+                            transitionToFinished();
+                            return maybeShutdownCluster(
+                                    dispatcherGateway, 
ApplicationStatus.SUCCEEDED);
+                        },
+                        mainThreadExecutor)
+                .thenCompose(Function.identity());
+    }
+
+    private CompletableFuture<Acknowledge> 
onCancelAllJobsAndWaitForTerminalStatesException(
+            final FatalErrorHandler errorHandler, final Throwable t) {
+        LOG.warn("Failed to cancel and wait for all jobs.", t);
+        errorHandler.onFatalError(t);
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
+    /**
+     * This method cancels the jobs to clean up related resources when the 
application ends. If any
+     * job cancellation fails, the returned future will complete exceptionally.
+     */
+    private CompletableFuture<Void> cancelAllJobsAndWaitForTerminalStates(
+            final DispatcherGateway dispatcherGateway, final ScheduledExecutor 
scheduledExecutor) {
+        final Duration timeout = 
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+
+        LOG.info(
+                "Start to cancel remaining jobs for application {} ({}).",
+                getName(),
+                getApplicationId());
+        // get all jobs that needs to be canceled
+        CompletableFuture<Set<JobID>> jobsToCancelFuture =
+                FutureUtils.combineAll(
+                                getJobs().stream()
+                                        .map(
+                                                jobId ->
+                                                        
getNonGlobalTerminalJob(
+                                                                
dispatcherGateway, jobId, timeout))
+                                        .collect(Collectors.toList()))
+                        .thenApply(
+                                jobIds ->
+                                        jobIds.stream()
+                                                .filter(Objects::nonNull)
+                                                .collect(Collectors.toSet()));
+
+        // get all jobs that is canceling
+        CompletableFuture<Set<JobID>> jobsCancelingFuture =
+                jobsToCancelFuture.thenCompose(
+                        jobsToCancel -> {
+                            if (!jobsToCancel.isEmpty()) {
+                                LOG.info(
+                                        "Canceling jobs for application '{}' 
({}): {}",
+                                        getName(),
+                                        getApplicationId(),
+                                        jobsToCancel);
+                            }
+
+                            List<CompletableFuture<JobID>> cancelFutures =
+                                    jobsToCancel.stream()
+                                            .map(
+                                                    jobId ->
+                                                            cancelJob(
+                                                                    
dispatcherGateway,
+                                                                    jobId,
+                                                                    timeout))
+                                            .collect(Collectors.toList());
+
+                            return FutureUtils.combineAll(cancelFutures)
+                                    .thenApply(
+                                            jobIds ->
+                                                    jobIds.stream()
+                                                            
.filter(Objects::nonNull)
+                                                            
.collect(Collectors.toSet()));
+                        });
+
+        // wait for all jobs to be canceled in the scheduledExecutor
+        return jobsCancelingFuture.thenComposeAsync(
+                jobsCanceling -> {
+                    List<CompletableFuture<?>> jobResultFutures =
+                            jobsCanceling.stream()
+                                    .map(
+                                            jobId ->
+                                                    getJobResult(
+                                                                    
dispatcherGateway,
+                                                                    jobId,
+                                                                    
scheduledExecutor,
+                                                                    false)
+                                                            .thenApply(ignored 
-> null))
+                                    .collect(Collectors.toList());
+                    return FutureUtils.waitForAll(jobResultFutures);
+                },
+                scheduledExecutor);
+    }
+
+    private CompletableFuture<JobID> getNonGlobalTerminalJob(
+            DispatcherGateway dispatcherGateway, JobID jobId, Duration 
timeout) {
+        return dispatcherGateway
+                .requestJobStatus(jobId, timeout)
+                .thenApply(
+                        jobStatus -> {
+                            if (!jobStatus.isGloballyTerminalState()) {
+                                return jobId;
+                            } else {
+                                return null;
+                            }
+                        });
+    }
+
+    private CompletableFuture<JobID> cancelJob(
+            DispatcherGateway dispatcherGateway, JobID jobId, Duration 
timeout) {
+        return dispatcherGateway.cancelJob(jobId, timeout).thenApply(ignored 
-> jobId);
+    }
+
+    private CompletableFuture<Acknowledge> maybeShutdownCluster(
+            DispatcherGateway dispatcherGateway, ApplicationStatus 
applicationStatus) {
+        return shutDownOnFinish
+                ? dispatcherGateway.shutDownCluster(applicationStatus)
+                : CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
+    private Optional<JobStatus> extractJobStatus(Throwable t) {
+        final Optional<UnsuccessfulExecutionException> maybeException =
+                ExceptionUtils.findThrowable(t, 
UnsuccessfulExecutionException.class);
+        return 
maybeException.flatMap(UnsuccessfulExecutionException::getStatus);
+    }
+
+    /**
+     * Runs the user program entrypoint and completes the given {@code 
jobIdsFuture} with the {@link
+     * JobID JobIDs} of the submitted jobs.
+     *
+     * <p>This should be executed in a separate thread (or task).
+     */
+    private void runApplicationEntryPoint(
+            final CompletableFuture<List<JobID>> jobIdsFuture,
+            final Set<JobID> tolerateMissingResult,
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor) {
+        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
+            jobIdsFuture.completeExceptionally(
+                    new ApplicationExecutionException(
+                            String.format(
+                                    "Submission of failed job in case of an 
application error ('%s') is not supported in non-HA setups.",
+                                    
DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
+                                            .key())));
+            return;
+        }
+        final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
+        try {
+            if (program == null) {
+                LOG.info("Reconstructing program from descriptor {}", 
programDescriptor);
+                program = programDescriptor.toPackageProgram();
+            }
+
+            final PipelineExecutorServiceLoader executorServiceLoader =
+                    new EmbeddedExecutorServiceLoader(
+                            applicationJobIds, dispatcherGateway, 
scheduledExecutor);
+
+            ClientUtils.executeProgram(
+                    executorServiceLoader,
+                    configuration,
+                    program,
+                    enforceSingleJobExecution,
+                    true /* suppress sysout */);
+
+            if (applicationJobIds.isEmpty()) {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException(
+                                "The application contains no execute() 
calls."));
+            } else {
+                jobIdsFuture.complete(applicationJobIds);
+            }
+        } catch (Throwable t) {
+            // If we're running in a single job execution mode, it's safe to 
consider re-submission
+            // of an already finished a success.
+            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
+            if (enforceSingleJobExecution
+                    && maybeDuplicate.isPresent()
+                    && maybeDuplicate.get().isGloballyTerminated()) {
+                final JobID jobId = maybeDuplicate.get().getJobID();
+                tolerateMissingResult.add(jobId);
+                jobIdsFuture.complete(Collections.singletonList(jobId));
+            } else if (submitFailedJobOnApplicationError && 
applicationJobIds.isEmpty()) {
+                final JobID failedJobId =
+                        JobID.fromHexString(
+                                
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+                dispatcherGateway
+                        .submitFailedJob(failedJobId, FAILED_JOB_NAME, t)
+                        .thenAccept(
+                                ignored ->
+                                        jobIdsFuture.complete(
+                                                
Collections.singletonList(failedJobId)));
+            } else {
+                jobIdsFuture.completeExceptionally(
+                        new ApplicationExecutionException("Could not execute 
application.", t));
+            }
+        }
+    }
+
+    private CompletableFuture<Void> waitForJobResults(
+            final DispatcherGateway dispatcherGateway,
+            final Collection<JobID> applicationJobIds,
+            final Set<JobID> tolerateMissingResult,
+            final ScheduledExecutor executor) {
+        final List<CompletableFuture<?>> jobResultFutures =
+                applicationJobIds.stream()
+                        .map(
+                                jobId ->
+                                        unwrapJobResultException(
+                                                getJobResult(
+                                                        dispatcherGateway,
+                                                        jobId,
+                                                        executor,
+                                                        
tolerateMissingResult.contains(jobId))))
+                        .collect(Collectors.toList());
+
+        return 
configuration.get(DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION)
+                ? FutureUtils.waitForAll(jobResultFutures)
+                : FutureUtils.completeAll(jobResultFutures);
+    }
+
+    private CompletableFuture<JobResult> getJobResult(
+            final DispatcherGateway dispatcherGateway,
+            final JobID jobId,
+            final ScheduledExecutor scheduledExecutor,
+            final boolean tolerateMissingResult) {
+        final Duration timeout = 
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+        final Duration retryPeriod = 
configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
+        final CompletableFuture<JobResult> jobResultFuture =
+                JobStatusPollingUtils.getJobResult(
+                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
+        if (tolerateMissingResult) {
+            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
+            return FutureUtils.handleException(
+                    jobResultFuture,
+                    FlinkJobNotFoundException.class,
+                    exception ->
+                            new JobResult.Builder()
+                                    .jobId(jobId)
+                                    .jobStatus(null)
+                                    .netRuntime(Long.MAX_VALUE)
+                                    .build());
+        }
+        return jobResultFuture;
+    }
+
+    /**
+     * If the given {@link JobResult} indicates success, this passes through 
the {@link JobResult}.
+     * Otherwise, this returns a future that is finished exceptionally 
(potentially with an
+     * exception from the {@link JobResult}).
+     */
+    private CompletableFuture<JobResult> unwrapJobResultException(
+            final CompletableFuture<JobResult> jobResult) {
+        return jobResult.thenApply(
+                result -> {
+                    if (result.isSuccess()) {
+                        return result;
+                    }
+
+                    throw new CompletionException(
+                            UnsuccessfulExecutionException.fromJobResult(
+                                    result, program.getUserCodeClassLoader()));
+                });
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index af1be294d07..0479cb7a544 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -98,6 +98,9 @@ public class PackagedProgram implements AutoCloseable {
     /** Flag indicating whether the job is a Python job. */
     private final boolean isPython;
 
+    /** Serializable descriptor to reconstruct the PackagedProgram. */
+    private final PackagedProgramDescriptor descriptor;
+
     /**
      * Creates an instance that wraps the plan defined in the jar file using 
the given arguments.
      * For generating the plan the class defined in the className parameter is 
used.
@@ -163,6 +166,19 @@ public class PackagedProgram implements AutoCloseable {
             throw new ProgramInvocationException(
                     "The given program class does not have a main(String[]) 
method.");
         }
+
+        this.descriptor =
+                new PackagedProgramDescriptor(
+                        jarFile,
+                        classpaths,
+                        configuration,
+                        savepointRestoreSettings,
+                        args,
+                        getMainClassName());
+    }
+
+    public PackagedProgramDescriptor getDescriptor() {
+        return descriptor;
     }
 
     public SavepointRestoreSettings getSavepointSettings() {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java
new file mode 100644
index 00000000000..d36a16a8c69
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import 
org.apache.flink.client.deployment.application.PackagedProgramApplication;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Descriptor for {@link PackagedProgram}.
+ *
+ * <p>This class provides a serializable representation of {@link 
PackagedProgram} that can be used
+ * to reconstruct the {@link PackagedProgram} after serialization and 
deserialization. It is mainly
+ * used by {@link PackagedProgramApplication}.
+ */
+public class PackagedProgramDescriptor implements Serializable {
+
+    @Nullable private final File jarFile;
+
+    private final List<URL> userClassPaths;
+
+    private final Configuration configuration;
+
+    private final SavepointRestoreSettings savepointRestoreSettings;
+
+    private final String[] programArgs;
+
+    private final String mainClassName;
+
+    public PackagedProgramDescriptor(
+            @Nullable File jarFile,
+            List<URL> userClassPaths,
+            Configuration configuration,
+            SavepointRestoreSettings savepointRestoreSettings,
+            String[] programArgs,
+            String mainClassName) {
+        this.jarFile = jarFile;
+        this.userClassPaths = userClassPaths;
+        this.configuration = configuration;
+        this.savepointRestoreSettings = savepointRestoreSettings;
+        this.programArgs = programArgs;
+        this.mainClassName = mainClassName;
+    }
+
+    public String getMainClassName() {
+        return mainClassName;
+    }
+
+    public PackagedProgram toPackageProgram() throws 
ProgramInvocationException {
+        return PackagedProgram.newBuilder()
+                .setJarFile(jarFile)
+                .setEntryPointClassName(mainClassName)
+                .setConfiguration(configuration)
+                .setUserClassPaths(userClassPaths)
+                .setArguments(programArgs)
+                .setSavepointRestoreSettings(savepointRestoreSettings)
+                .build();
+    }
+
+    @Override
+    public String toString() {
+        return "PackagedProgramDescriptor{"
+                + "jarFile="
+                + jarFile
+                + ", userClassPaths="
+                + userClassPaths
+                + ", configuration="
+                + configuration
+                + ", savepointRestoreSettings="
+                + savepointRestoreSettings
+                + ", programArgs="
+                + Arrays.toString(programArgs)
+                + ", mainClassName="
+                + mainClassName
+                + '}';
+    }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
new file mode 100644
index 00000000000..12affab4747
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
@@ -0,0 +1,1344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.testjar.FailingJob;
+import org.apache.flink.client.testjar.MultiExecuteJob;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for the {@link PackagedProgramApplication}. */
+public class PackagedProgramApplicationTest {
+
+    private static final int TIMEOUT_SECONDS = 10;
+
+    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
+    private final ScheduledExecutor scheduledExecutor =
+            new ScheduledExecutorServiceAdapter(executor);
+    private Executor mainThreadExecutor = 
Executors.newSingleThreadScheduledExecutor();
+
+    @AfterEach
+    void cleanup() {
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
+    }
+
+    @Test
+    void testOnlyOneJobIsAllowedWhenEnforceSingleJobExecution() throws 
Throwable {
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        2, getConfiguration(), 
finishedJobGatewayBuilder().build(), true);
+
+        assertException(application.getApplicationCompletionFuture(), 
FlinkRuntimeException.class);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+    }
+
+    @Test
+    void testStaticJobId() throws Throwable {
+        final JobID testJobID = new JobID(0, 2);
+
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+
+        final CompletableFuture<JobID> submittedJobId = new 
CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                finishedJobGatewayBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    
submittedJobId.complete(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(1, configurationUnderTest, 
dispatcherBuilder.build());
+
+        application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+
+        assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+                .isEqualTo(new JobID(0L, 2L));
+    }
+
+    @Test
+    void testApplicationCleanupWhenOneJobFails() throws Throwable {
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+        final ConcurrentLinkedDeque<JobID> canceledJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(JobStatus.FAILED);
+                                    }
+                                    if (canceledJobIds.contains(jobId)) {
+                                        return 
CompletableFuture.completedFuture(
+                                                JobStatus.CANCELED);
+                                    }
+                                    return 
CompletableFuture.completedFuture(JobStatus.RUNNING);
+                                })
+                        .setRequestJobResultFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(
+                                                createFailedJobResult(jobId));
+                                    }
+                                    // the other jobs should be canceled
+                                    return CompletableFuture.completedFuture(
+                                            createCanceledJobResult(jobId));
+                                })
+                        .setCancelJobFunction(
+                                jobId -> {
+                                    canceledJobIds.add(jobId);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        getProgram(2),
+                        Collections.emptyList(),
+                        getConfiguration(),
+                        true,
+                        false,
+                        false,
+                        true);
+
+        // change mainThreadExecutor to be manually triggered
+        // so that the application finish process isn't completed before 
adding the jobs
+        mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+        final ManuallyTriggeredScheduledExecutor 
manuallyTriggeredMainThreadExecutor =
+                (ManuallyTriggeredScheduledExecutor) mainThreadExecutor;
+
+        // wait for the application execution to be accepted
+        application
+                .execute(
+                        dispatcherBuilder.build(),
+                        scheduledExecutor,
+                        mainThreadExecutor,
+                        exception -> {})
+                .join();
+
+        // Wait for the task that is used to finish the application.
+        while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) {
+            Thread.sleep(100);
+        }
+        submittedJobIds.forEach(application::addJob);
+
+        // Triggers the process to finish the application after adding the 
jobs. This
+        // ensures that the application knows that these jobs need to be 
canceled.
+        manuallyTriggeredMainThreadExecutor.trigger();
+        final UnsuccessfulExecutionException exception =
+                assertException(
+                        application.getApplicationCompletionFuture(),
+                        UnsuccessfulExecutionException.class);
+        
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
+
+        // Wait for the task that is used to transition to the final state.
+        while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) {
+            Thread.sleep(100);
+        }
+        manuallyTriggeredMainThreadExecutor.trigger();
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertThat(canceledJobIds).containsOnly(submittedJobIds.peekLast());
+        assertApplicationFailed(application);
+    }
+
+    @Test
+    void testErrorHandlerIsCalledWhenApplicationCleanupThrowsAnException() 
throws Throwable {
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(JobStatus.FAILED);
+                                    }
+                                    // never finish the other jobs
+                                    return 
CompletableFuture.completedFuture(JobStatus.RUNNING);
+                                })
+                        .setRequestJobResultFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(
+                                                createFailedJobResult(jobId));
+                                    }
+                                    // never finish the other jobs
+                                    return new CompletableFuture<>();
+                                })
+                        .setCancelJobFunction(
+                                jobId ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
FlinkRuntimeException("Test exception.")));
+
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        getProgram(2),
+                        Collections.emptyList(),
+                        getConfiguration(),
+                        true,
+                        false,
+                        false,
+                        true);
+
+        // change mainThreadExecutor to be manually triggered
+        // so that the application finish process isn't completed before 
adding the jobs
+        mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+        final ManuallyTriggeredScheduledExecutor 
manuallyTriggeredMainThreadExecutor =
+                (ManuallyTriggeredScheduledExecutor) mainThreadExecutor;
+
+        // wait for the application execution to be accepted
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        application
+                .execute(
+                        dispatcherBuilder.build(),
+                        scheduledExecutor,
+                        mainThreadExecutor,
+                        errorHandlerFuture::completeExceptionally)
+                .join();
+
+        // Wait for the task that is used to finish the application.
+        while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) {
+            Thread.sleep(100);
+        }
+        submittedJobIds.forEach(application::addJob);
+
+        // Triggers the process to finish the application after adding the 
jobs. This
+        // ensures that the application knows that these jobs need to be 
canceled.
+        manuallyTriggeredMainThreadExecutor.trigger();
+        final UnsuccessfulExecutionException exception =
+                assertException(
+                        application.getApplicationCompletionFuture(),
+                        UnsuccessfulExecutionException.class);
+        
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
+
+        // Wait for the task that is used to transition to the final state.
+        while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) {
+            Thread.sleep(100);
+        }
+        manuallyTriggeredMainThreadExecutor.trigger();
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailing(application);
+
+        // we call the error handler
+        assertException(errorHandlerFuture, FlinkRuntimeException.class);
+
+        // we didn't shut down the cluster
+        assertThat(shutdownCalled.get()).isFalse();
+    }
+
+    @Test
+    void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(JobStatus.FAILED);
+                                    }
+                                    // never finish the other jobs
+                                    return 
CompletableFuture.completedFuture(JobStatus.RUNNING);
+                                })
+                        .setRequestJobResultFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(
+                                                createFailedJobResult(jobId));
+                                    }
+                                    // never finish the other jobs
+                                    return new CompletableFuture<>();
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(2, dispatcherBuilder.build());
+        final UnsuccessfulExecutionException exception =
+                assertException(
+                        application.getApplicationCompletionFuture(),
+                        UnsuccessfulExecutionException.class);
+        
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+    }
+
+    @Test
+    void testApplicationFinishesAfterAllJobsTerminate() throws Throwable {
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION, 
false);
+
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final CompletableFuture<ApplicationStatus> clusterShutdownStatus =
+                new CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(JobStatus.FAILED);
+                                    }
+                                    // finish the other jobs
+                                    return 
CompletableFuture.completedFuture(JobStatus.FINISHED);
+                                })
+                        .setRequestJobResultFunction(
+                                jobId -> {
+                                    // we only fail one of the jobs, the first 
one
+                                    if (jobId.equals(submittedJobIds.peek())) {
+                                        return 
CompletableFuture.completedFuture(
+                                                createFailedJobResult(jobId));
+                                    }
+                                    // finish the other jobs
+                                    return CompletableFuture.completedFuture(
+                                            createJobResult(jobId, 
JobStatus.FINISHED));
+                                })
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    clusterShutdownStatus.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(2, configurationUnderTest, 
dispatcherBuilder.build());
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+
+        assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .isEqualTo(ApplicationStatus.SUCCEEDED);
+    }
+
+    @Test
+    void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(3, 
finishedJobGatewayBuilder().build());
+
+        // this would block indefinitely if the applications don't finish
+        application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+    }
+
+    @Test
+    void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
+        final CompletableFuture<ApplicationStatus> clusterShutdownStatus =
+                new CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                canceledJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    clusterShutdownStatus.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        // wait until the application "thinks" it's done, also makes sure that 
we don't
+        // fail the future exceptionally with a JobCancelledException
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationCanceled(application);
+
+        assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .isEqualTo(ApplicationStatus.CANCELED);
+    }
+
+    @Test
+    void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception 
{
+        final TestingDispatcherGateway.Builder dispatcherBuilder = 
finishedJobGatewayBuilder();
+
+        PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        ScheduledFuture<?> applicationExecutionFuture = 
application.getApplicationExecutionFuture();
+
+        // wait until the application "thinks" it's done
+        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+
+        // make sure the task finishes
+        applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testApplicationCancellationWhenNotRunning() throws Exception {
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        getProgram(1),
+                        Collections.emptyList(),
+                        getConfiguration(),
+                        true,
+                        false,
+                        false,
+                        true);
+
+        application.cancel();
+
+        assertApplicationCanceled(application);
+    }
+
+    @Test
+    void testApplicationIsCanceledWhenCancellingApplication() throws Exception 
{
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+        final ConcurrentLinkedDeque<JobID> canceledJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                runningJobGatewayBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> {
+                                    if (canceledJobIds.contains(jobId)) {
+                                        return 
CompletableFuture.completedFuture(
+                                                JobStatus.CANCELED);
+                                    }
+                                    return 
CompletableFuture.completedFuture(JobStatus.RUNNING);
+                                })
+                        .setRequestJobResultFunction(
+                                jobId -> {
+                                    // all jobs should be canceled
+                                    return CompletableFuture.completedFuture(
+                                            createCanceledJobResult(jobId));
+                                })
+                        .setCancelJobFunction(
+                                jobId -> {
+                                    canceledJobIds.add(jobId);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        // we're "listening" on this to be completed to verify that the error 
handler is called.
+        // In production, this will shut down the cluster with an exception.
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        3,
+                        dispatcherBuilder.build(),
+                        manuallyTriggeredExecutor,
+                        errorHandlerFuture::completeExceptionally);
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        ScheduledFuture<?> applicationExecutionFuture = 
application.getApplicationExecutionFuture();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                application.cancel();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        },
+                        mainThreadExecutor)
+                .join();
+
+        // Triggers the scheduled process after calling cancel. This
+        // ensures that the application task isn't completed before the cancel 
method is called
+        // which would prevent the cancel call from cancelling the task's 
future.
+        manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask();
+
+        // we didn't call the error handler
+        assertThat(errorHandlerFuture.isDone()).isFalse();
+
+        // Wait to trigger the task that is used to finish the application. .
+        while (manuallyTriggeredExecutor.numQueuedRunnables() < 1) {
+            Thread.sleep(100);
+        }
+        manuallyTriggeredExecutor.trigger();
+
+        // completion future gets completed normally
+        completionFuture.get();
+
+        // verify that we shut down the cluster
+        assertThat(shutdownCalled.get()).isTrue();
+
+        // verify that the application task is being cancelled
+        assertThat(applicationExecutionFuture.isCancelled()).isTrue();
+        assertThat(applicationExecutionFuture.isDone()).isTrue();
+        
assertThat(canceledJobIds).containsExactlyInAnyOrderElementsOf(submittedJobIds);
+        assertApplicationCanceled(application);
+    }
+
+    @Test
+    void testApplicationDispose() throws Exception {
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                runningJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        // we're "listening" on this to be completed to verify that the error 
handler is called.
+        // In production, this will shut down the cluster with an exception.
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        3,
+                        dispatcherBuilder.build(),
+                        manuallyTriggeredExecutor,
+                        errorHandlerFuture::completeExceptionally);
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        ScheduledFuture<?> applicationExecutionFuture = 
application.getApplicationExecutionFuture();
+
+        application.dispose();
+
+        // Triggers the scheduled process after calling cancel. This
+        // ensures that the application task isn't completed before the cancel 
method is called
+        // which would prevent the cancel call from cancelling the task's 
future.
+        manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask();
+
+        // we didn't call the error handler
+        assertThat(errorHandlerFuture.isDone()).isFalse();
+
+        // completion future gets completed normally
+        completionFuture.get();
+
+        // verify that we didn't shut down the cluster
+        assertThat(shutdownCalled.get()).isFalse();
+
+        // verify that the application task is being cancelled
+        assertThat(applicationExecutionFuture.isCancelled()).isTrue();
+        assertThat(applicationExecutionFuture.isDone()).isTrue();
+    }
+
+    @Test
+    void testErrorHandlerIsNotCalledWhenSubmissionThrowsAnException() throws 
Exception {
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                runningJobGatewayBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    throw new FlinkRuntimeException("Nope!");
+                                })
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        // we're "listening" on this to be completed to verify that the error 
handler is called.
+        // In production, this will shut down the cluster with an exception.
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        2,
+                        dispatcherBuilder.build(),
+                        scheduledExecutor,
+                        errorHandlerFuture::completeExceptionally);
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        // we return a future that is completed normally
+        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+
+        // we do not call the error handler
+        assertThat(errorHandlerFuture.isDone()).isFalse();
+
+        // verify that we shut down the cluster
+        assertThat(shutdownCalled.get()).isTrue();
+    }
+
+    @Test
+    void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws 
Exception {
+        testErrorHandlerIsCalled(
+                () ->
+                        FutureUtils.completedExceptionally(
+                                new FlinkRuntimeException("Test exception.")));
+    }
+
+    @Test
+    void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws 
Exception {
+        testErrorHandlerIsCalled(
+                () -> {
+                    throw new FlinkRuntimeException("Test exception.");
+                });
+    }
+
+    private void 
testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> 
shutdownFunction)
+            throws Exception {
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                        .setRequestJobStatusFunction(
+                                jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        CompletableFuture.completedFuture(
+                                                createJobResult(jobId, 
JobStatus.FINISHED)))
+                        .setClusterShutdownFunction(status -> 
shutdownFunction.get());
+
+        // we're "listening" on this to be completed to verify that the error 
handler is called.
+        // In production, this will shut down the cluster with an exception.
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        3,
+                        dispatcherGateway,
+                        scheduledExecutor,
+                        errorHandlerFuture::completeExceptionally);
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        // we call the error handler
+        assertException(errorHandlerFuture, FlinkRuntimeException.class);
+
+        // we return a future that is completed exceptionally
+        assertException(completionFuture, FlinkRuntimeException.class);
+
+        // shut down exception should not affect application result
+        assertApplicationFinished(application);
+    }
+
+    @Test
+    void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws 
Exception {
+        final CompletableFuture<ApplicationStatus> clusterShutdown = new 
CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherGatewayBuilder =
+                canceledJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    clusterShutdown.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final Configuration configuration = getConfiguration();
+        configuration.set(DeploymentOptions.ATTACHED, true);
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(2, configuration, 
dispatcherGatewayBuilder.build());
+        assertException(
+                application.getApplicationCompletionFuture(), 
UnsuccessfulExecutionException.class);
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationCanceled(application);
+
+        
assertThat(clusterShutdown.get()).isEqualTo(ApplicationStatus.CANCELED);
+    }
+
+    @Test
+    void testClusterShutdownWhenApplicationSucceeds() throws Exception {
+        // we're "listening" on this to be completed to verify that the cluster
+        // is being shut down from the PackagedProgramApplication
+        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
+                new CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                finishedJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    externalShutdownFuture.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        // wait until the application "thinks" it's done
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+
+        // verify that the dispatcher is actually being shut down
+        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .isEqualTo(ApplicationStatus.SUCCEEDED);
+    }
+
+    @Test
+    void testClusterShutdownWhenApplicationFails() throws Exception {
+        // we're "listening" on this to be completed to verify that the cluster
+        // is being shut down from the PackagedProgramApplication
+        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
+                new CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                failedJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    externalShutdownFuture.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        // wait until the application "thinks" it's done
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+
+        // verify that the dispatcher is actually being shut down
+        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .isEqualTo(ApplicationStatus.FAILED);
+    }
+
+    @Test
+    void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
+        // we're "listening" on this to be completed to verify that the cluster
+        // is being shut down from the PackagedProgramApplication
+        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
+                new CompletableFuture<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                canceledJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    externalShutdownFuture.complete(status);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        // wait until the application "thinks" it's done
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationCanceled(application);
+
+        // verify that the dispatcher is actually being shut down
+        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .isEqualTo(ApplicationStatus.CANCELED);
+    }
+
+    @Test
+    void testErrorHandlerIsNotCalledWhenApplicationStatusIsUnknown() throws 
Exception {
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                canceledJobGatewayBuilder()
+                        .setRequestJobResultFunction(
+                                jobID ->
+                                        CompletableFuture.completedFuture(
+                                                createUnknownJobResult(jobID)))
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        3,
+                        dispatcherGateway,
+                        scheduledExecutor,
+                        errorHandlerFuture::completeExceptionally);
+
+        // check that application completes exceptionally
+        assertException(
+                application.getApplicationCompletionFuture(), 
UnsuccessfulExecutionException.class);
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+
+        // we do not call the error handler
+        assertThat(errorHandlerFuture.isDone()).isFalse();
+
+        // verify that we shut down the cluster
+        assertThat(shutdownCalled.get()).isTrue();
+    }
+
+    @Test
+    void testErrorHandlerIsNotCalled() throws Exception {
+        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+        // Job submission error
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                finishedJobGatewayBuilder()
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    shutdownCalled.set(true);
+                                    throw new FlinkRuntimeException("Nope!");
+                                });
+
+        final AtomicBoolean errorHandlerCalled = new AtomicBoolean(false);
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        2,
+                        getConfiguration(),
+                        dispatcherBuilder.build(),
+                        scheduledExecutor,
+                        exception -> errorHandlerCalled.set(true),
+                        false,
+                        false,
+                        true);
+
+        final CompletableFuture<Acknowledge> completionFuture =
+                application.getFinishApplicationFuture();
+
+        // we return a future that is completed exceptionally
+        assertException(completionFuture, FlinkRuntimeException.class);
+        // shut down exception should not affect application result
+        assertApplicationFinished(application);
+
+        // we do not call the error handler
+        assertThat(errorHandlerCalled.get()).isFalse();
+
+        // verify that we shut down the cluster
+        assertThat(shutdownCalled.get()).isTrue();
+    }
+
+    @Test
+    void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                finishedJobGatewayBuilder()
+                        .setSubmitFunction(
+                                jobGraph ->
+                                        FutureUtils.completedExceptionally(
+                                                DuplicateJobSubmissionException
+                                                        
.ofGloballyTerminated(testJobID)));
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        1, configurationUnderTest, dispatcherBuilder.build(), 
true);
+
+        application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+    }
+
+    /**
+     * In this scenario, job result is no longer present in the {@link
+     * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has 
terminated and job
+     * manager failed over), but we know that job has already terminated from 
{@link
+     * org.apache.flink.runtime.highavailability.JobResultStore}.
+     */
+    @Test
+    void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() 
throws Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph ->
+                                        FutureUtils.completedExceptionally(
+                                                DuplicateJobSubmissionException
+                                                        
.ofGloballyTerminated(testJobID)))
+                        .setRequestJobStatusFunction(
+                                jobId ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
FlinkJobNotFoundException(jobId)))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
FlinkJobNotFoundException(jobId)));
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        1, configurationUnderTest, dispatcherBuilder.build(), 
true);
+
+        application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+    }
+
+    @Test
+    void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph ->
+                                        FutureUtils.completedExceptionally(
+                                                
DuplicateJobSubmissionException.of(testJobID)));
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(1, configurationUnderTest, 
dispatcherBuilder.build());
+        final CompletableFuture<Void> applicationFuture =
+                application.getApplicationCompletionFuture();
+        final ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () -> applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS));
+        final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+                ExceptionUtils.findThrowable(
+                        executionException, 
DuplicateJobSubmissionException.class);
+        assertThat(maybeDuplicate).isPresent();
+        assertThat(maybeDuplicate.get().isGloballyTerminated()).isFalse();
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = JobStatus.class,
+            names = {"FINISHED", "CANCELED", "FAILED"})
+    void testShutdownDisabled(JobStatus jobStatus) throws Exception {
+        final TestingDispatcherGateway dispatcherGateway =
+                dispatcherGatewayBuilder(jobStatus)
+                        .setClusterShutdownFunction(
+                                status -> {
+                                    fail("Cluster shutdown should not be 
called");
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .build();
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        1,
+                        getConfiguration(),
+                        dispatcherGateway,
+                        scheduledExecutor,
+                        exception -> {},
+                        true,
+                        false,
+                        false);
+
+        // Wait until application is finished to make sure cluster shutdown 
isn't called
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationStatus(application, 
ApplicationState.fromJobStatus(jobStatus));
+    }
+
+    @Test
+    void testSubmitFailedJobOnApplicationErrorWhenEnforceSingleJobExecution() 
throws Exception {
+        final Configuration configuration = getConfiguration();
+        final JobID jobId = new JobID();
+        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
+        testSubmitFailedJobOnApplicationError(
+                configuration,
+                (id, t) -> {
+                    assertThat(id).isEqualTo(jobId);
+                    assertThat(t)
+                            .isInstanceOf(ProgramInvocationException.class)
+                            .hasRootCauseInstanceOf(RuntimeException.class)
+                            .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE);
+                });
+    }
+
+    private void testSubmitFailedJobOnApplicationError(
+            Configuration configuration, BiConsumer<JobID, Throwable> 
failedJobAssertion)
+            throws Exception {
+        final CompletableFuture<Void> submitted = new CompletableFuture<>();
+        final TestingDispatcherGateway dispatcherGateway =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFailedFunction(
+                                (jobId, jobName, t) -> {
+                                    try {
+                                        failedJobAssertion.accept(jobId, t);
+                                        submitted.complete(null);
+                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                    } catch (Throwable assertion) {
+                                        
submitted.completeExceptionally(assertion);
+                                        return 
FutureUtils.completedExceptionally(assertion);
+                                    }
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> submitted.thenApply(ignored -> 
JobStatus.FAILED))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        submitted.thenApply(
+                                                ignored ->
+                                                        createJobResult(jobId, 
JobStatus.FAILED)))
+                        .build();
+
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        FailingJob.getProgram(),
+                        Collections.emptyList(),
+                        configuration,
+                        true,
+                        true /* enforceSingleJobExecution */,
+                        true /* submitFailedJobOnApplicationError */,
+                        true);
+
+        executeApplication(application, dispatcherGateway, scheduledExecutor, 
exception -> {});
+
+        application.getFinishApplicationFuture().get();
+        assertApplicationFailed(application);
+    }
+
+    @Test
+    void 
testSubmitFailedJobOnApplicationErrorWhenNotEnforceSingleJobExecution() throws 
Exception {
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        FailingJob.getProgram(),
+                        Collections.emptyList(),
+                        getConfiguration(),
+                        true,
+                        false /* enforceSingleJobExecution */,
+                        true /* submitFailedJobOnApplicationError */,
+                        true);
+
+        executeApplication(
+                application,
+                TestingDispatcherGateway.newBuilder().build(),
+                scheduledExecutor,
+                exception -> {});
+
+        assertThatFuture(application.getApplicationCompletionFuture())
+                .eventuallyFailsWith(ExecutionException.class)
+                .extracting(Throwable::getCause)
+                .satisfies(
+                        e ->
+                                assertThat(e)
+                                        .hasMessageContaining(
+                                                DeploymentOptions
+                                                        
.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
+                                                        .key()));
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+    }
+
+    private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() {
+        return dispatcherGatewayBuilder(JobStatus.FINISHED);
+    }
+
+    private TestingDispatcherGateway.Builder failedJobGatewayBuilder() {
+        return dispatcherGatewayBuilder(JobStatus.FAILED);
+    }
+
+    private TestingDispatcherGateway.Builder canceledJobGatewayBuilder() {
+        return dispatcherGatewayBuilder(JobStatus.CANCELED);
+    }
+
+    private TestingDispatcherGateway.Builder runningJobGatewayBuilder() {
+        return dispatcherGatewayBuilder(JobStatus.RUNNING);
+    }
+
+    private TestingDispatcherGateway.Builder 
dispatcherGatewayBuilder(JobStatus jobStatus) {
+        TestingDispatcherGateway.Builder builder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                        .setRequestJobStatusFunction(
+                                jobId -> 
CompletableFuture.completedFuture(jobStatus));
+        if (jobStatus != JobStatus.RUNNING) {
+            builder.setRequestJobResultFunction(
+                    jobID -> 
CompletableFuture.completedFuture(createJobResult(jobID, jobStatus)));
+        }
+        return builder;
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs, final DispatcherGateway dispatcherGateway) 
throws FlinkException {
+        return createAndExecuteApplication(
+                numJobs, getConfiguration(), dispatcherGateway, 
scheduledExecutor, exception -> {});
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs,
+            final Configuration configuration,
+            final DispatcherGateway dispatcherGateway)
+            throws FlinkException {
+        return createAndExecuteApplication(numJobs, configuration, 
dispatcherGateway, false);
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs,
+            final Configuration configuration,
+            final DispatcherGateway dispatcherGateway,
+            final boolean enforceSingleJobExecution)
+            throws FlinkException {
+        return createAndExecuteApplication(
+                numJobs,
+                configuration,
+                dispatcherGateway,
+                scheduledExecutor,
+                exception -> {},
+                true,
+                enforceSingleJobExecution,
+                true);
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs,
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final FatalErrorHandler errorHandler)
+            throws FlinkException {
+        return createAndExecuteApplication(
+                numJobs, getConfiguration(), dispatcherGateway, 
scheduledExecutor, errorHandler);
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs,
+            final Configuration configuration,
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final FatalErrorHandler errorHandler)
+            throws FlinkException {
+        return createAndExecuteApplication(
+                numJobs,
+                configuration,
+                dispatcherGateway,
+                scheduledExecutor,
+                errorHandler,
+                true,
+                false,
+                true);
+    }
+
+    private PackagedProgramApplication createAndExecuteApplication(
+            final int numJobs,
+            final Configuration configuration,
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final FatalErrorHandler errorHandler,
+            boolean handleFatalError,
+            boolean enforceSingleJobExecution,
+            boolean shutDownOnFinish)
+            throws FlinkException {
+
+        final PackagedProgram program = getProgram(numJobs);
+
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        program,
+                        Collections.emptyList(),
+                        configuration,
+                        handleFatalError,
+                        enforceSingleJobExecution,
+                        false,
+                        shutDownOnFinish);
+        assertApplicationCreated(application);
+
+        executeApplication(application, dispatcherGateway, scheduledExecutor, 
errorHandler);
+        return application;
+    }
+
+    private void executeApplication(
+            final PackagedProgramApplication application,
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final FatalErrorHandler errorHandler) {
+
+        CompletableFuture.runAsync(
+                        () ->
+                                application.execute(
+                                        dispatcherGateway,
+                                        scheduledExecutor,
+                                        mainThreadExecutor,
+                                        errorHandler),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    private PackagedProgram getProgram(int numJobs) throws FlinkException {
+        return MultiExecuteJob.getProgram(numJobs, true);
+    }
+
+    private static JobResult createFailedJobResult(final JobID jobId) {
+        return createJobResult(jobId, JobStatus.FAILED);
+    }
+
+    private static JobResult createCanceledJobResult(final JobID jobId) {
+        return createJobResult(jobId, JobStatus.CANCELED);
+    }
+
+    private static JobResult createUnknownJobResult(final JobID jobId) {
+        return createJobResult(jobId, null);
+    }
+
+    private static JobResult createJobResult(
+            final JobID jobID, @Nullable final JobStatus jobStatus) {
+        JobResult.Builder builder =
+                new 
JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
+        if (jobStatus == JobStatus.CANCELED) {
+            builder.serializedThrowable(
+                    new SerializedThrowable(new 
JobCancellationException(jobID, "Hello", null)));
+        } else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
+            builder.serializedThrowable(
+                    new SerializedThrowable(new JobExecutionException(jobID, 
"bla bla bla")));
+        }
+        return builder.build();
+    }
+
+    private static <T, E extends Throwable> E assertException(
+            CompletableFuture<T> future, Class<E> exceptionClass) throws 
Exception {
+
+        try {
+            future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            Optional<E> maybeException = ExceptionUtils.findThrowable(e, 
exceptionClass);
+            if (!maybeException.isPresent()) {
+                throw e;
+            }
+            return maybeException.get();
+        }
+        throw new Exception(
+                "Future should have completed exceptionally with "
+                        + exceptionClass.getCanonicalName()
+                        + ".");
+    }
+
+    private Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        return configuration;
+    }
+
+    private void assertApplicationStatus(
+            PackagedProgramApplication application, ApplicationState 
expectedStatus) {
+        
assertThat(application.getApplicationStatus()).isEqualTo(expectedStatus);
+    }
+
+    private void assertApplicationCreated(PackagedProgramApplication 
application) {
+        assertApplicationStatus(application, ApplicationState.CREATED);
+    }
+
+    private void assertApplicationFailing(PackagedProgramApplication 
application) {
+        assertApplicationStatus(application, ApplicationState.FAILING);
+    }
+
+    private void assertApplicationCanceled(PackagedProgramApplication 
application) {
+        assertApplicationStatus(application, ApplicationState.CANCELED);
+    }
+
+    private void assertApplicationFailed(PackagedProgramApplication 
application) {
+        assertApplicationStatus(application, ApplicationState.FAILED);
+    }
+
+    private void assertApplicationFinished(PackagedProgramApplication 
application) {
+        assertApplicationStatus(application, ApplicationState.FINISHED);
+    }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java
new file mode 100644
index 00000000000..2f2d684ec65
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PackagedProgramDescriptor}. */
+public class PackagedProgramDescriptorTest {
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testPackagedProgramDescriptor(boolean jarFileIsNull) throws Exception 
{
+        File testJar = new File(CliFrontendTestUtils.getTestJarPath());
+        File jarFile = jarFileIsNull ? null : testJar;
+        SavepointRestoreSettings savepointSettings = 
SavepointRestoreSettings.none();
+        List<URL> userClassPaths = 
Collections.singletonList(testJar.toURI().toURL());
+        String[] programArgs = {"--input", "test", "--output", "result"};
+        String mainClasssName = TEST_JAR_MAIN_CLASS;
+        PackagedProgram program =
+                PackagedProgram.newBuilder()
+                        .setJarFile(jarFile)
+                        .setEntryPointClassName(mainClasssName)
+                        .setUserClassPaths(userClassPaths)
+                        .setArguments(programArgs)
+                        .setSavepointRestoreSettings(savepointSettings)
+                        .build();
+        PackagedProgramDescriptor descriptor = program.getDescriptor();
+
+        assertThat(descriptor.getMainClassName()).isEqualTo(mainClasssName);
+
+        PackagedProgram reconstructedProgram = descriptor.toPackageProgram();
+
+        // Verify the reconstructed program has the expected properties
+        assertThat(reconstructedProgram.getUserCodeClassLoader()).isNotNull();
+        assertThat(reconstructedProgram.getClasspaths())
+                .containsExactly(userClassPaths.toArray(URL[]::new));
+        
assertThat(reconstructedProgram.getSavepointSettings()).isEqualTo(savepointSettings);
+        
assertThat(reconstructedProgram.getArguments()).containsExactly(programArgs);
+        
assertThat(reconstructedProgram.getMainClassName()).isEqualTo(mainClasssName);
+
+        reconstructedProgram.close();
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
index b5b7f5db04d..12e1ebfa144 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java
@@ -21,6 +21,9 @@ package org.apache.flink.api.common;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.DeploymentOptions;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /** Possible states of an application. */
 @PublicEvolving
 public enum ApplicationState {
@@ -60,4 +63,28 @@ public enum ApplicationState {
     public boolean isTerminalState() {
         return terminalState;
     }
+
+    private static final Map<JobStatus, ApplicationState> 
JOB_STATUS_APPLICATION_STATE_MAP =
+            new HashMap<>();
+
+    static {
+        // only globally terminal JobStatus can have a corresponding 
ApplicationState
+        JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.FAILED, 
ApplicationState.FAILED);
+        JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.CANCELED, 
ApplicationState.CANCELED);
+        JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.FINISHED, 
ApplicationState.FINISHED);
+    }
+
+    /**
+     * Derives the ApplicationState that corresponds to the given JobStatus. 
This method only
+     * accepts globally terminal JobStatus. If the job status is not globally 
terminal, this method
+     * throws an IllegalArgumentException.
+     */
+    public static ApplicationState fromJobStatus(JobStatus jobStatus) {
+        if (!JOB_STATUS_APPLICATION_STATE_MAP.containsKey(jobStatus)) {
+            throw new IllegalArgumentException(
+                    "JobStatus " + jobStatus + " does not have a corresponding 
ApplicationState.");
+        }
+
+        return JOB_STATUS_APPLICATION_STATE_MAP.get(jobStatus);
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index bd422a629ad..2e127c5cf2e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -155,4 +155,13 @@ public class DeploymentOptions {
                                                     + "regardless of this 
setting.",
                                             
TextElement.text(PROGRAM_CONFIG_WILDCARDS.key()))
                                     .build());
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION =
+            
ConfigOptions.key("execution.terminate-application-on-any-job-terminated-exceptionally")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When it is set to true, the application will 
complete exceptionally if any job fails or is canceled."
+                                    + " When it is set to false, the 
application will finish after all jobs reach terminal states.");
 }

Reply via email to