This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4690f277efc [FLINK-38759][runtime] Refactor application mode with 
PackagedProgramApplication
4690f277efc is described below

commit 4690f277efcd84eafb94e32c72a3af13378ce8de
Author: Yi Zhang <[email protected]>
AuthorDate: Fri Sep 26 16:16:45 2025 +0800

    [FLINK-38759][runtime] Refactor application mode with 
PackagedProgramApplication
---
 .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8           |    2 +-
 .../java/org/apache/flink/client/ClientUtils.java  |   23 +-
 .../ApplicationDispatcherBootstrap.java            |  406 --------
 ...ApplicationDispatcherGatewayServiceFactory.java |   41 +-
 .../application/ApplicationJobUtils.java           |  109 +++
 .../application/PackagedProgramApplication.java    |    3 +-
 .../client/program/StreamContextEnvironment.java   |   39 +-
 .../ApplicationDispatcherBootstrapTest.java        | 1023 --------------------
 .../application/ApplicationJobUtilsTest.java       |  174 ++++
 ....java => PackagedProgramApplicationITCase.java} |    7 +-
 .../configuration/ApplicationOptionsInternal.java  |   32 +
 .../DuplicateApplicationSubmissionException.java   |   38 +
 .../runtime/dispatcher/ApplicationBootstrap.java   |   42 +
 .../flink/runtime/dispatcher/Dispatcher.java       |   82 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java    |   31 +-
 .../streaming/api/graph/AdaptiveGraphManager.java  |    6 +-
 .../flink/streaming/api/graph/ExecutionPlan.java   |    9 +
 .../flink/streaming/api/graph/StreamGraph.java     |   33 +-
 .../api/graph/StreamingJobGraphGenerator.java      |   22 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |   94 ++
 .../service/application/ScriptExecutorITCase.java  |    3 +-
 21 files changed, 746 insertions(+), 1473 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
index 05081f810d9..256be5208ff 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
@@ -21,7 +21,7 @@ 
org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
 
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object,
 org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument 
leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context 
does not satisfy: reside outside of package 'org.apache.flink..' or reside in 
any package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
-org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean): Argument leaf type 
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or 
annotated with @Deprecated
+org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type 
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated wit [...]
 
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned 
leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of 
package 'org.apache.flink..' or reside in any package ['..shaded..'] or 
annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
 
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration):
 Returned leaf type 
org.apache.flink.configuration.JobManagerOptions$SchedulerType does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f8554ed3064..dd680883bc5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.cli.ClientOptions;
@@ -42,6 +43,8 @@ import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
@@ -80,6 +83,23 @@ public enum ClientUtils {
             boolean enforceSingleJobExecution,
             boolean suppressSysout)
             throws ProgramInvocationException {
+        executeProgram(
+                executorServiceLoader,
+                configuration,
+                program,
+                enforceSingleJobExecution,
+                suppressSysout,
+                null);
+    }
+
+    public static void executeProgram(
+            PipelineExecutorServiceLoader executorServiceLoader,
+            Configuration configuration,
+            PackagedProgram program,
+            boolean enforceSingleJobExecution,
+            boolean suppressSysout,
+            @Nullable ApplicationID applicationId)
+            throws ProgramInvocationException {
         checkNotNull(executorServiceLoader);
         final ClassLoader userCodeClassLoader = 
program.getUserCodeClassLoader();
         final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
@@ -95,7 +115,8 @@ public enum ClientUtils {
                     configuration,
                     userCodeClassLoader,
                     enforceSingleJobExecution,
-                    suppressSysout);
+                    suppressSysout,
+                    applicationId);
 
             // For DataStream v2.
             ExecutionContextEnvironment.setAsContext(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
deleted file mode 100644
index 4290dcd4748..00000000000
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.deployment.application;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.cli.ClientOptions;
-import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
-import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link DispatcherBootstrap} used for running the user's {@code main()} in 
"Application Mode"
- * (see FLIP-85).
- *
- * <p>This dispatcher bootstrap submits the recovered {@link JobGraph job 
graphs} for re-execution
- * (in case of recovery from a failure), and then submits the remaining jobs 
of the application for
- * execution.
- *
- * <p>To achieve this, it works in conjunction with the {@link 
EmbeddedExecutor EmbeddedExecutor}
- * which decides if it should submit a job for execution (in case of a new 
job) or the job was
- * already recovered and is running.
- */
-@Internal
-public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
-
-    @VisibleForTesting static final String FAILED_JOB_NAME = "(application 
driver)";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
-
-    private static boolean isCanceledOrFailed(ApplicationStatus 
applicationStatus) {
-        return applicationStatus == ApplicationStatus.CANCELED
-                || applicationStatus == ApplicationStatus.FAILED;
-    }
-
-    private final PackagedProgram application;
-
-    private final Collection<JobID> recoveredJobIds;
-
-    private final Configuration configuration;
-
-    private final FatalErrorHandler errorHandler;
-
-    private final CompletableFuture<Void> applicationCompletionFuture;
-
-    private final CompletableFuture<Acknowledge> bootstrapCompletionFuture;
-
-    private ScheduledFuture<?> applicationExecutionTask;
-
-    public ApplicationDispatcherBootstrap(
-            final PackagedProgram application,
-            final Collection<JobID> recoveredJobIds,
-            final Configuration configuration,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor,
-            final FatalErrorHandler errorHandler) {
-        this.configuration = checkNotNull(configuration);
-        this.recoveredJobIds = checkNotNull(recoveredJobIds);
-        this.application = checkNotNull(application);
-        this.errorHandler = checkNotNull(errorHandler);
-
-        this.applicationCompletionFuture =
-                fixJobIdAndRunApplicationAsync(dispatcherGateway, 
scheduledExecutor);
-
-        this.bootstrapCompletionFuture = 
finishBootstrapTasks(dispatcherGateway);
-    }
-
-    @Override
-    public void stop() {
-        if (applicationExecutionTask != null) {
-            applicationExecutionTask.cancel(true);
-        }
-
-        if (applicationCompletionFuture != null) {
-            applicationCompletionFuture.cancel(true);
-        }
-    }
-
-    @VisibleForTesting
-    ScheduledFuture<?> getApplicationExecutionFuture() {
-        return applicationExecutionTask;
-    }
-
-    @VisibleForTesting
-    CompletableFuture<Void> getApplicationCompletionFuture() {
-        return applicationCompletionFuture;
-    }
-
-    @VisibleForTesting
-    CompletableFuture<Acknowledge> getBootstrapCompletionFuture() {
-        return bootstrapCompletionFuture;
-    }
-
-    /**
-     * Logs final application status and invokes error handler in case of 
unexpected failures.
-     * Optionally shuts down the given dispatcherGateway when the application 
completes (either
-     * successfully or in case of failure), depending on the corresponding 
config option.
-     */
-    private CompletableFuture<Acknowledge> finishBootstrapTasks(
-            final DispatcherGateway dispatcherGateway) {
-        final CompletableFuture<Acknowledge> shutdownFuture =
-                applicationCompletionFuture
-                        .handle(
-                                (ignored, t) -> {
-                                    if (t == null) {
-                                        LOG.info("Application completed 
SUCCESSFULLY");
-                                        return finish(
-                                                dispatcherGateway, 
ApplicationStatus.SUCCEEDED);
-                                    }
-                                    final Optional<ApplicationStatus> 
maybeApplicationStatus =
-                                            extractApplicationStatus(t);
-                                    if (maybeApplicationStatus.isPresent()
-                                            && 
isCanceledOrFailed(maybeApplicationStatus.get())) {
-                                        final ApplicationStatus 
applicationStatus =
-                                                maybeApplicationStatus.get();
-                                        LOG.info("Application {}: ", 
applicationStatus, t);
-                                        return finish(dispatcherGateway, 
applicationStatus);
-                                    }
-                                    if (t instanceof CancellationException) {
-                                        LOG.warn(
-                                                "Application has been 
cancelled because the {} is being stopped.",
-                                                
ApplicationDispatcherBootstrap.class
-                                                        .getSimpleName());
-                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                    }
-                                    LOG.warn("Application failed unexpectedly: 
", t);
-                                    return 
FutureUtils.<Acknowledge>completedExceptionally(t);
-                                })
-                        .thenCompose(Function.identity());
-        FutureUtils.handleUncaughtException(shutdownFuture, (t, e) -> 
errorHandler.onFatalError(e));
-        return shutdownFuture;
-    }
-
-    private CompletableFuture<Acknowledge> finish(
-            DispatcherGateway dispatcherGateway, ApplicationStatus 
applicationStatus) {
-        boolean shouldShutDownOnFinish =
-                
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
-        return shouldShutDownOnFinish
-                ? dispatcherGateway.shutDownCluster(applicationStatus)
-                : CompletableFuture.completedFuture(Acknowledge.get());
-    }
-
-    private Optional<ApplicationStatus> extractApplicationStatus(Throwable t) {
-        final Optional<UnsuccessfulExecutionException> maybeException =
-                ExceptionUtils.findThrowable(t, 
UnsuccessfulExecutionException.class);
-        return maybeException.map(
-                exception -> 
ApplicationStatus.fromJobStatus(exception.getStatus().orElse(null)));
-    }
-
-    private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
-            final DispatcherGateway dispatcherGateway, final ScheduledExecutor 
scheduledExecutor) {
-        final Optional<String> configuredJobId =
-                
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
-        final boolean submitFailedJobOnApplicationError =
-                
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
-        if 
(!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
-                && !configuredJobId.isPresent()) {
-            return runApplicationAsync(
-                    dispatcherGateway, scheduledExecutor, false, 
submitFailedJobOnApplicationError);
-        }
-        if (!configuredJobId.isPresent()) {
-            // In HA mode, we only support single-execute jobs at the moment. 
Here, we manually
-            // generate the job id, if not configured, from the cluster id to 
keep it consistent
-            // across failover.
-            configuration.set(
-                    PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
-                    new JobID(
-                                    Preconditions.checkNotNull(
-                                                    configuration.get(
-                                                            
HighAvailabilityOptions.HA_CLUSTER_ID))
-                                            .hashCode(),
-                                    0)
-                            .toHexString());
-        }
-        return runApplicationAsync(
-                dispatcherGateway, scheduledExecutor, true, 
submitFailedJobOnApplicationError);
-    }
-
-    /**
-     * Runs the user program entrypoint by scheduling a task on the given 
{@code scheduledExecutor}.
-     * The returned {@link CompletableFuture} completes when all jobs of the 
user application
-     * succeeded. if any of them fails, or if job submission fails.
-     */
-    private CompletableFuture<Void> runApplicationAsync(
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor,
-            final boolean enforceSingleJobExecution,
-            final boolean submitFailedJobOnApplicationError) {
-        final CompletableFuture<List<JobID>> applicationExecutionFuture = new 
CompletableFuture<>();
-        final Set<JobID> tolerateMissingResult = 
Collections.synchronizedSet(new HashSet<>());
-
-        // we need to hand in a future as return value because we need to get 
those JobIs out
-        // from the scheduled task that executes the user program
-        applicationExecutionTask =
-                scheduledExecutor.schedule(
-                        () ->
-                                runApplicationEntryPoint(
-                                        applicationExecutionFuture,
-                                        tolerateMissingResult,
-                                        dispatcherGateway,
-                                        scheduledExecutor,
-                                        enforceSingleJobExecution,
-                                        submitFailedJobOnApplicationError),
-                        0L,
-                        TimeUnit.MILLISECONDS);
-
-        return applicationExecutionFuture.thenCompose(
-                jobIds ->
-                        getApplicationResult(
-                                dispatcherGateway,
-                                jobIds,
-                                tolerateMissingResult,
-                                scheduledExecutor));
-    }
-
-    /**
-     * Runs the user program entrypoint and completes the given {@code 
jobIdsFuture} with the {@link
-     * JobID JobIDs} of the submitted jobs.
-     *
-     * <p>This should be executed in a separate thread (or task).
-     */
-    private void runApplicationEntryPoint(
-            final CompletableFuture<List<JobID>> jobIdsFuture,
-            final Set<JobID> tolerateMissingResult,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor,
-            final boolean enforceSingleJobExecution,
-            final boolean submitFailedJobOnApplicationError) {
-        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException(
-                            String.format(
-                                    "Submission of failed job in case of an 
application error ('%s') is not supported in non-HA setups.",
-                                    
DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
-                                            .key())));
-            return;
-        }
-        final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
-        try {
-            final PipelineExecutorServiceLoader executorServiceLoader =
-                    new EmbeddedExecutorServiceLoader(
-                            applicationJobIds, dispatcherGateway, 
scheduledExecutor);
-
-            ClientUtils.executeProgram(
-                    executorServiceLoader,
-                    configuration,
-                    application,
-                    enforceSingleJobExecution,
-                    true /* suppress sysout */);
-
-            if (applicationJobIds.isEmpty()) {
-                jobIdsFuture.completeExceptionally(
-                        new ApplicationExecutionException(
-                                "The application contains no execute() 
calls."));
-            } else {
-                jobIdsFuture.complete(applicationJobIds);
-            }
-        } catch (Throwable t) {
-            // If we're running in a single job execution mode, it's safe to 
consider re-submission
-            // of an already finished a success.
-            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
-                    ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
-            if (enforceSingleJobExecution
-                    && maybeDuplicate.isPresent()
-                    && maybeDuplicate.get().isGloballyTerminated()) {
-                final JobID jobId = maybeDuplicate.get().getJobID();
-                tolerateMissingResult.add(jobId);
-                jobIdsFuture.complete(Collections.singletonList(jobId));
-            } else if (submitFailedJobOnApplicationError && 
applicationJobIds.isEmpty()) {
-                final JobID failedJobId =
-                        JobID.fromHexString(
-                                
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
-                dispatcherGateway
-                        .submitFailedJob(failedJobId, FAILED_JOB_NAME, t)
-                        .thenAccept(
-                                ignored ->
-                                        jobIdsFuture.complete(
-                                                
Collections.singletonList(failedJobId)));
-            } else {
-                jobIdsFuture.completeExceptionally(
-                        new ApplicationExecutionException("Could not execute 
application.", t));
-            }
-        }
-    }
-
-    private CompletableFuture<Void> getApplicationResult(
-            final DispatcherGateway dispatcherGateway,
-            final Collection<JobID> applicationJobIds,
-            final Set<JobID> tolerateMissingResult,
-            final ScheduledExecutor executor) {
-        final List<CompletableFuture<?>> jobResultFutures =
-                applicationJobIds.stream()
-                        .map(
-                                jobId ->
-                                        unwrapJobResultException(
-                                                getJobResult(
-                                                        dispatcherGateway,
-                                                        jobId,
-                                                        executor,
-                                                        
tolerateMissingResult.contains(jobId))))
-                        .collect(Collectors.toList());
-        return FutureUtils.waitForAll(jobResultFutures);
-    }
-
-    private CompletableFuture<JobResult> getJobResult(
-            final DispatcherGateway dispatcherGateway,
-            final JobID jobId,
-            final ScheduledExecutor scheduledExecutor,
-            final boolean tolerateMissingResult) {
-        final Duration timeout = 
configuration.get(ClientOptions.CLIENT_TIMEOUT);
-        final Duration retryPeriod = 
configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
-        final CompletableFuture<JobResult> jobResultFuture =
-                JobStatusPollingUtils.getJobResult(
-                        dispatcherGateway, jobId, scheduledExecutor, timeout, 
retryPeriod);
-        if (tolerateMissingResult) {
-            // Return "unknown" job result if dispatcher no longer knows the 
actual result.
-            return FutureUtils.handleException(
-                    jobResultFuture,
-                    FlinkJobNotFoundException.class,
-                    exception ->
-                            new JobResult.Builder()
-                                    .jobId(jobId)
-                                    .jobStatus(null)
-                                    .netRuntime(Long.MAX_VALUE)
-                                    .build());
-        }
-        return jobResultFuture;
-    }
-
-    /**
-     * If the given {@link JobResult} indicates success, this passes through 
the {@link JobResult}.
-     * Otherwise, this returns a future that is finished exceptionally 
(potentially with an
-     * exception from the {@link JobResult}).
-     */
-    private CompletableFuture<JobResult> unwrapJobResultException(
-            final CompletableFuture<JobResult> jobResult) {
-        return jobResult.thenApply(
-                result -> {
-                    if (result.isSuccess()) {
-                        return result;
-                    }
-
-                    throw new CompletionException(
-                            UnsuccessfulExecutionException.fromJobResult(
-                                    result, 
application.getUserCodeClassLoader()));
-                });
-    }
-}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index b7e188ef7d4..6c443c4b649 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -19,9 +19,13 @@
 package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.runtime.dispatcher.ApplicationBootstrap;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -51,8 +55,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>It instantiates a {@link
  * 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.DispatcherGatewayService
- * DispatcherGatewayService} with an {@link ApplicationDispatcherBootstrap} 
containing the user's
- * program.
+ * DispatcherGatewayService} with an {@link ApplicationBootstrap} containing 
the user's program.
  */
 @Internal
 public class ApplicationDispatcherGatewayServiceFactory
@@ -62,7 +65,7 @@ public class ApplicationDispatcherGatewayServiceFactory
 
     private final DispatcherFactory dispatcherFactory;
 
-    private final PackagedProgram application;
+    private final PackagedProgram program;
 
     private final RpcService rpcService;
 
@@ -71,12 +74,12 @@ public class ApplicationDispatcherGatewayServiceFactory
     public ApplicationDispatcherGatewayServiceFactory(
             Configuration configuration,
             DispatcherFactory dispatcherFactory,
-            PackagedProgram application,
+            PackagedProgram program,
             RpcService rpcService,
             PartialDispatcherServices partialDispatcherServices) {
         this.configuration = configuration;
         this.dispatcherFactory = dispatcherFactory;
-        this.application = checkNotNull(application);
+        this.program = checkNotNull(program);
         this.rpcService = rpcService;
         this.partialDispatcherServices = partialDispatcherServices;
     }
@@ -91,6 +94,26 @@ public class ApplicationDispatcherGatewayServiceFactory
 
         final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
 
+        final boolean allowExecuteMultipleJobs =
+                ApplicationJobUtils.allowExecuteMultipleJobs(configuration);
+        ApplicationJobUtils.maybeFixIds(configuration);
+        final ApplicationID applicationId =
+                configuration
+                        
.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID)
+                        .map(ApplicationID::fromHexString)
+                        .orElseGet(ApplicationID::new);
+
+        PackagedProgramApplication bootstrapApplication =
+                new PackagedProgramApplication(
+                        applicationId,
+                        program,
+                        recoveredJobIds,
+                        configuration,
+                        true,
+                        !allowExecuteMultipleJobs,
+                        
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR),
+                        
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));
+
         final Dispatcher dispatcher;
         try {
             dispatcher =
@@ -100,13 +123,7 @@ public class ApplicationDispatcherGatewayServiceFactory
                             recoveredJobs,
                             recoveredDirtyJobResults,
                             (dispatcherGateway, scheduledExecutor, 
errorHandler) ->
-                                    new ApplicationDispatcherBootstrap(
-                                            application,
-                                            recoveredJobIds,
-                                            configuration,
-                                            dispatcherGateway,
-                                            scheduledExecutor,
-                                            errorHandler),
+                                    new 
ApplicationBootstrap(bootstrapApplication),
                             
PartialDispatcherServicesWithJobPersistenceComponents.from(
                                     partialDispatcherServices,
                                     executionPlanWriter,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
new file mode 100644
index 00000000000..ac2b565c917
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+
+/** Utility class to handle application/job related configuration options in 
application mode. */
+public class ApplicationJobUtils {
+
+    /**
+     * Ensures deterministic application and job IDs in high availability (HA) 
mode.
+     *
+     * <p>In HA mode, fixed IDs are required to maintain state consistency 
across JobManager
+     * failovers. This method guarantees that the application ID and job ID 
are properly set as
+     * follows:
+     *
+     * <ul>
+     *   <li>If no application ID is configured, it generates a fixed one from 
the HA cluster ID.
+     *   <li>If no job ID is configured, it generates a fixed one based on the 
application ID (or
+     *       the HA cluster ID if the application ID is also absent).
+     * </ul>
+     *
+     * <p>If HA mode is disabled, this method does nothing; and the system 
will assign random
+     * application/job IDs if none is configured.
+     *
+     * @param configuration The configuration the may be updated with fixed IDs
+     */
+    public static void maybeFixIds(Configuration configuration) {
+        if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
+            final Optional<String> configuredApplicationId =
+                    
configuration.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID);
+            if (configuredApplicationId.isEmpty()) {
+                // In HA mode, a fixed application id is required to ensure 
consistency across
+                // failovers. The application id is derived from the cluster 
id.
+                configuration.set(
+                        ApplicationOptionsInternal.FIXED_APPLICATION_ID,
+                        new ApplicationID(
+                                        Preconditions.checkNotNull(
+                                                        configuration.get(
+                                                                
HighAvailabilityOptions
+                                                                        
.HA_CLUSTER_ID))
+                                                .hashCode(),
+                                        0)
+                                .toHexString());
+            }
+            final Optional<String> configuredJobId =
+                    
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
+            if (configuredJobId.isEmpty()) {
+                // In HA mode, a fixed job id is required to ensure 
consistency across failovers.
+                // The job id is derived as follows:
+                // 1. If application id is configured, use the application id 
as the job id.
+                // 2. Otherwise, generate the job id based on the HA cluster 
id.
+                // Note that the second case is kept for backward 
compatibility and may be removed.
+                if (configuredApplicationId.isPresent()) {
+                    ApplicationID applicationId =
+                            ApplicationID.fromHexString(
+                                    configuration.get(
+                                            
ApplicationOptionsInternal.FIXED_APPLICATION_ID));
+                    configuration.set(
+                            PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
+                            applicationId.toHexString());
+                } else {
+                    configuration.set(
+                            PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
+                            new JobID(
+                                            Preconditions.checkNotNull(
+                                                            configuration.get(
+                                                                    
HighAvailabilityOptions
+                                                                            
.HA_CLUSTER_ID))
+                                                    .hashCode(),
+                                            0)
+                                    .toHexString());
+                }
+            }
+        }
+    }
+
+    public static boolean allowExecuteMultipleJobs(Configuration config) {
+        final Optional<String> configuredJobId =
+                
config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
+        return !HighAvailabilityMode.isHighAvailabilityModeActivated(config)
+                && configuredJobId.isEmpty();
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index 5cc2f2cc047..de5de97b752 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -533,7 +533,8 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                     configuration,
                     program,
                     enforceSingleJobExecution,
-                    true /* suppress sysout */);
+                    true /* suppress sysout */,
+                    getApplicationId());
 
             if (applicationJobIds.isEmpty()) {
                 jobIdsFuture.completeExceptionally(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 7dde266cf85..2799ab63cb5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.ClientOptions;
@@ -42,6 +43,8 @@ import 
org.apache.flink.shaded.guava33.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -72,6 +75,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
     private final boolean programConfigEnabled;
     private final Collection<String> programConfigWildcards;
 
+    @Nullable private final ApplicationID applicationId;
+
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration configuration,
@@ -89,7 +94,6 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                 Collections.emptyList());
     }
 
-    @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration clusterConfiguration,
@@ -99,6 +103,29 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final boolean suppressSysout,
             final boolean programConfigEnabled,
             final Collection<String> programConfigWildcards) {
+        this(
+                executorServiceLoader,
+                clusterConfiguration,
+                configuration,
+                userCodeClassLoader,
+                enforceSingleJobExecution,
+                suppressSysout,
+                programConfigEnabled,
+                programConfigWildcards,
+                null);
+    }
+
+    @Internal
+    public StreamContextEnvironment(
+            final PipelineExecutorServiceLoader executorServiceLoader,
+            final Configuration clusterConfiguration,
+            final Configuration configuration,
+            final ClassLoader userCodeClassLoader,
+            final boolean enforceSingleJobExecution,
+            final boolean suppressSysout,
+            final boolean programConfigEnabled,
+            final Collection<String> programConfigWildcards,
+            @Nullable final ApplicationID applicationId) {
         super(executorServiceLoader, configuration, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
@@ -106,6 +133,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
         this.jobCounter = 0;
         this.programConfigEnabled = programConfigEnabled;
         this.programConfigWildcards = programConfigWildcards;
+        this.applicationId = applicationId;
     }
 
     @Override
@@ -185,6 +213,9 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
     public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
         checkNotAllowedConfigurations();
         validateAllowedExecution();
+        if (applicationId != null) {
+            streamGraph.setApplicationId(applicationId);
+        }
         final JobClient jobClient = super.executeAsync(streamGraph);
 
         if (!suppressSysout) {
@@ -209,7 +240,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final Configuration clusterConfiguration,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
-            final boolean suppressSysout) {
+            final boolean suppressSysout,
+            @Nullable final ApplicationID applicationId) {
         final StreamExecutionEnvironmentFactory factory =
                 envInitConfig -> {
                     final boolean programConfigEnabled =
@@ -227,7 +259,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                             enforceSingleJobExecution,
                             suppressSysout,
                             programConfigEnabled,
-                            programConfigWildcards);
+                            programConfigWildcards,
+                            applicationId);
                 };
         initializeContextEnvironment(factory);
     }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
deleted file mode 100644
index 47c5e13fe7b..00000000000
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ /dev/null
@@ -1,1023 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.deployment.application;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.testjar.FailingJob;
-import org.apache.flink.client.testjar.MultiExecuteJob;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-
-import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/** Tests for the {@link ApplicationDispatcherBootstrap}. */
-class ApplicationDispatcherBootstrapTest {
-
-    private static final int TIMEOUT_SECONDS = 10;
-
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
-    private final ScheduledExecutor scheduledExecutor =
-            new ScheduledExecutorServiceAdapter(executor);
-
-    @AfterEach
-    void cleanup() {
-        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
-    }
-
-    @Test
-    void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()));
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 0);
-
-        assertException(applicationFuture, 
ApplicationExecutionException.class);
-    }
-
-    @Test
-    void testOnlyOneJobIsAllowedWithHa() throws Throwable {
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(configurationUnderTest, 2);
-
-        assertException(applicationFuture, FlinkRuntimeException.class);
-    }
-
-    @Test
-    void testOnlyOneJobAllowedWithStaticJobId() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(configurationUnderTest, 2);
-
-        assertException(applicationFuture, FlinkRuntimeException.class);
-    }
-
-    @Test
-    void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(configurationUnderTest, 2);
-
-        assertException(applicationFuture, FlinkRuntimeException.class);
-    }
-
-    @Test
-    void testJobIdDefaultsToClusterIdWithHa() throws Throwable {
-        final Configuration configurationUnderTest = getConfiguration();
-        final String clusterId = "cluster";
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        configurationUnderTest.set(HighAvailabilityOptions.HA_CLUSTER_ID, 
clusterId);
-
-        final CompletableFuture<JobID> submittedJobId = new 
CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                finishedJobGatewayBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> {
-                                    
submittedJobId.complete(jobGraph.getJobID());
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isEqualTo(new JobID(clusterId.hashCode(), 0L));
-    }
-
-    @Test
-    void testStaticJobId() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-
-        final CompletableFuture<JobID> submittedJobId = new 
CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                finishedJobGatewayBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> {
-                                    
submittedJobId.complete(jobGraph.getJobID());
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isEqualTo(new JobID(0L, 2L));
-    }
-
-    @Test
-    void testStaticJobIdWithHa() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-
-        final CompletableFuture<JobID> submittedJobId = new 
CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                finishedJobGatewayBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> {
-                                    
submittedJobId.complete(jobGraph.getJobID());
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isEqualTo(new JobID(0L, 2L));
-    }
-
-    @Test
-    void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
-        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> {
-                                    submittedJobIds.add(jobGraph.getJobID());
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                })
-                        .setRequestJobStatusFunction(
-                                jobId -> {
-                                    // we only fail one of the jobs, the first 
one, the others will
-                                    // "keep" running
-                                    // indefinitely
-                                    if (jobId.equals(submittedJobIds.peek())) {
-                                        return 
CompletableFuture.completedFuture(JobStatus.FAILED);
-                                    }
-                                    // never finish the other jobs
-                                    return 
CompletableFuture.completedFuture(JobStatus.RUNNING);
-                                })
-                        .setRequestJobResultFunction(
-                                jobId -> {
-                                    // we only fail one of the jobs, the first 
one, the other will
-                                    // "keep" running
-                                    // indefinitely. If we didn't have this 
the test would hang
-                                    // forever.
-                                    if (jobId.equals(submittedJobIds.peek())) {
-                                        return 
CompletableFuture.completedFuture(
-                                                createFailedJobResult(jobId));
-                                    }
-                                    // never finish the other jobs
-                                    return new CompletableFuture<>();
-                                });
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 2);
-        final UnsuccessfulExecutionException exception =
-                assertException(applicationFuture, 
UnsuccessfulExecutionException.class);
-        
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
-    }
-
-    @Test
-    void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
-        final TestingDispatcherGateway.Builder dispatcherBuilder = 
finishedJobGatewayBuilder();
-
-        final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 3);
-
-        // this would block indefinitely if the applications don't finish
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    @Test
-    void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
-        final CompletableFuture<ApplicationStatus> clusterShutdownStatus =
-                new CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                canceledJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    clusterShutdownStatus.complete(status);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3, dispatcherBuilder.build(), scheduledExecutor);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // wait until the bootstrap "thinks" it's done, also makes sure that 
we don't
-        // fail the future exceptionally with a JobCancelledException
-        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
-                .isEqualTo(ApplicationStatus.CANCELED);
-    }
-
-    @Test
-    void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception 
{
-        final TestingDispatcherGateway.Builder dispatcherBuilder = 
finishedJobGatewayBuilder();
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3, dispatcherBuilder.build(), scheduledExecutor);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
-
-        // wait until the bootstrap "thinks" it's done
-        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        // make sure the task finishes
-        applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    @Test
-    void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
-        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                runningJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    shutdownCalled.set(true);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =
-                new ManuallyTriggeredScheduledExecutor();
-        // we're "listening" on this to be completed to verify that the error 
handler is called.
-        // In production, this will shut down the cluster with an exception.
-        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
-        final ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3,
-                        dispatcherBuilder.build(),
-                        manuallyTriggeredExecutor,
-                        errorHandlerFuture::completeExceptionally);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
-
-        bootstrap.stop();
-
-        // Triggers the scheduled ApplicationDispatcherBootstrap process after 
calling stop. This
-        // ensures that the bootstrap task isn't completed before the stop 
method is called which
-        // would prevent the stop call from cancelling the task's future.
-        manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask();
-
-        // we didn't call the error handler
-        assertThat(errorHandlerFuture.isDone()).isFalse();
-
-        // completion future gets completed normally
-        completionFuture.get();
-
-        // verify that we didn't shut down the cluster
-        assertThat(shutdownCalled.get()).isFalse();
-
-        // verify that the application task is being cancelled
-        assertThat(applicationExecutionFuture.isCancelled()).isTrue();
-        assertThat(applicationExecutionFuture.isDone()).isTrue();
-    }
-
-    @Test
-    void testErrorHandlerIsCalledWhenSubmissionThrowsAnException() throws 
Exception {
-        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                runningJobGatewayBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> {
-                                    throw new FlinkRuntimeException("Nope!");
-                                })
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    shutdownCalled.set(true);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        // we're "listening" on this to be completed to verify that the error 
handler is called.
-        // In production, this will shut down the cluster with an exception.
-        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
-        final ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        2,
-                        dispatcherBuilder.build(),
-                        scheduledExecutor,
-                        errorHandlerFuture::completeExceptionally);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // we call the error handler
-        assertException(errorHandlerFuture, FlinkRuntimeException.class);
-
-        // we return a future that is completed exceptionally
-        assertException(completionFuture, FlinkRuntimeException.class);
-
-        // and cluster shutdown didn't get called
-        assertThat(shutdownCalled.get()).isFalse();
-    }
-
-    @Test
-    void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws 
Exception {
-        testErrorHandlerIsCalled(
-                () ->
-                        FutureUtils.completedExceptionally(
-                                new FlinkRuntimeException("Test exception.")));
-    }
-
-    @Test
-    void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws 
Exception {
-        testErrorHandlerIsCalled(
-                () -> {
-                    throw new FlinkRuntimeException("Test exception.");
-                });
-    }
-
-    private void 
testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> 
shutdownFunction)
-            throws Exception {
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
-                        .setRequestJobStatusFunction(
-                                jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
-                        .setRequestJobResultFunction(
-                                jobId ->
-                                        CompletableFuture.completedFuture(
-                                                createJobResult(jobId, 
JobStatus.FINISHED)))
-                        .setClusterShutdownFunction(status -> 
shutdownFunction.get());
-
-        // we're "listening" on this to be completed to verify that the error 
handler is called.
-        // In production, this will shut down the cluster with an exception.
-        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
-        final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
-        final ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3,
-                        dispatcherGateway,
-                        scheduledExecutor,
-                        errorHandlerFuture::completeExceptionally);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // we call the error handler
-        assertException(errorHandlerFuture, FlinkRuntimeException.class);
-
-        // we return a future that is completed exceptionally
-        assertException(completionFuture, FlinkRuntimeException.class);
-    }
-
-    @Test
-    void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws 
Exception {
-        final CompletableFuture<ApplicationStatus> clusterShutdown = new 
CompletableFuture<>();
-
-        final TestingDispatcherGateway dispatcherGateway =
-                canceledJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    clusterShutdown.complete(status);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                })
-                        .build();
-
-        final PackagedProgram program = getProgram(2);
-
-        final Configuration configuration = getConfiguration();
-        configuration.set(DeploymentOptions.ATTACHED, true);
-
-        final ApplicationDispatcherBootstrap bootstrap =
-                new ApplicationDispatcherBootstrap(
-                        program,
-                        Collections.emptyList(),
-                        configuration,
-                        dispatcherGateway,
-                        scheduledExecutor,
-                        e -> {});
-
-        final CompletableFuture<Void> applicationFuture =
-                bootstrap.getApplicationCompletionFuture();
-        assertException(applicationFuture, 
UnsuccessfulExecutionException.class);
-
-        
assertThat(clusterShutdown.get()).isEqualTo(ApplicationStatus.CANCELED);
-    }
-
-    @Test
-    void testClusterShutdownWhenApplicationSucceeds() throws Exception {
-        // we're "listening" on this to be completed to verify that the cluster
-        // is being shut down from the ApplicationDispatcherBootstrap
-        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
-                new CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                finishedJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    externalShutdownFuture.complete(status);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3, dispatcherBuilder.build(), scheduledExecutor);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // wait until the bootstrap "thinks" it's done
-        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        // verify that the dispatcher is actually being shut down
-        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
-                .isEqualTo(ApplicationStatus.SUCCEEDED);
-    }
-
-    @Test
-    void testClusterShutdownWhenApplicationFails() throws Exception {
-        // we're "listening" on this to be completed to verify that the cluster
-        // is being shut down from the ApplicationDispatcherBootstrap
-        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
-                new CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                failedJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    externalShutdownFuture.complete(status);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3, dispatcherBuilder.build(), scheduledExecutor);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // wait until the bootstrap "thinks" it's done
-        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        // verify that the dispatcher is actually being shut down
-        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
-                .isEqualTo(ApplicationStatus.FAILED);
-    }
-
-    @Test
-    void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
-        // we're "listening" on this to be completed to verify that the cluster
-        // is being shut down from the ApplicationDispatcherBootstrap
-        final CompletableFuture<ApplicationStatus> externalShutdownFuture =
-                new CompletableFuture<>();
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                canceledJobGatewayBuilder()
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    externalShutdownFuture.complete(status);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3, dispatcherBuilder.build(), scheduledExecutor);
-
-        final CompletableFuture<Acknowledge> completionFuture =
-                bootstrap.getBootstrapCompletionFuture();
-
-        // wait until the bootstrap "thinks" it's done
-        completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        // verify that the dispatcher is actually being shut down
-        assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
-                .isEqualTo(ApplicationStatus.CANCELED);
-    }
-
-    @Test
-    void testErrorHandlerIsCalledWhenApplicationStatusIsUnknown() throws 
Exception {
-        // we're "listening" on this to be completed to verify that the cluster
-        // is being shut down from the ApplicationDispatcherBootstrap
-        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                canceledJobGatewayBuilder()
-                        .setRequestJobResultFunction(
-                                jobID ->
-                                        CompletableFuture.completedFuture(
-                                                createUnknownJobResult(jobID)))
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    shutdownCalled.set(true);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
-        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
-        final ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        3,
-                        dispatcherGateway,
-                        scheduledExecutor,
-                        errorHandlerFuture::completeExceptionally);
-
-        // check that bootstrap shutdown completes exceptionally
-        assertException(
-                bootstrap.getApplicationCompletionFuture(), 
UnsuccessfulExecutionException.class);
-        // and exception gets propagated to error handler
-        assertException(
-                bootstrap.getApplicationCompletionFuture(), 
UnsuccessfulExecutionException.class);
-        // and cluster didn't shut down
-        assertThat(shutdownCalled.get()).isFalse();
-    }
-
-    @Test
-    void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                finishedJobGatewayBuilder()
-                        .setSubmitFunction(
-                                jobGraph ->
-                                        FutureUtils.completedExceptionally(
-                                                DuplicateJobSubmissionException
-                                                        
.ofGloballyTerminated(testJobID)));
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    /**
-     * In this scenario, job result is no longer present in the {@link
-     * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has 
terminated and job
-     * manager failed over), but we know that job has already terminated from 
{@link
-     * org.apache.flink.runtime.highavailability.JobResultStore}.
-     */
-    @Test
-    void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() 
throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph ->
-                                        FutureUtils.completedExceptionally(
-                                                DuplicateJobSubmissionException
-                                                        
.ofGloballyTerminated(testJobID)))
-                        .setRequestJobStatusFunction(
-                                jobId ->
-                                        FutureUtils.completedExceptionally(
-                                                new 
FlinkJobNotFoundException(jobId)))
-                        .setRequestJobResultFunction(
-                                jobId ->
-                                        FutureUtils.completedExceptionally(
-                                                new 
FlinkJobNotFoundException(jobId)));
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    /**
-     * In this scenario, job result is no longer present in the {@link
-     * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has 
terminated and job
-     * manager failed over), but we know that job has already terminated from 
{@link
-     * org.apache.flink.runtime.highavailability.JobResultStore}.
-     */
-    @Test
-    void 
testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttached() throws 
Throwable {
-        final JobID testJobID = new JobID(0, 2);
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph ->
-                                        FutureUtils.completedExceptionally(
-                                                DuplicateJobSubmissionException
-                                                        
.ofGloballyTerminated(testJobID)))
-                        .setRequestJobStatusFunction(
-                                jobId ->
-                                        FutureUtils.completedExceptionally(
-                                                new 
FlinkJobNotFoundException(jobId)))
-                        .setRequestJobResultFunction(
-                                jobId ->
-                                        FutureUtils.completedExceptionally(
-                                                new 
FlinkJobNotFoundException(jobId)));
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-        applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    @Test
-    void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
-        final JobID testJobID = new JobID(0, 2);
-        final Configuration configurationUnderTest = getConfiguration();
-        configurationUnderTest.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
-        configurationUnderTest.set(
-                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph ->
-                                        FutureUtils.completedExceptionally(
-                                                
DuplicateJobSubmissionException.of(testJobID)));
-        final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-        final ExecutionException executionException =
-                assertThrows(
-                        ExecutionException.class,
-                        () -> applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS));
-        final Optional<DuplicateJobSubmissionException> maybeDuplicate =
-                ExceptionUtils.findThrowable(
-                        executionException, 
DuplicateJobSubmissionException.class);
-        assertThat(maybeDuplicate).isPresent();
-        assertThat(maybeDuplicate.get().isGloballyTerminated()).isFalse();
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = JobStatus.class,
-            names = {"FINISHED", "CANCELED", "FAILED"})
-    void testShutdownDisabled(JobStatus jobStatus) throws Exception {
-        final Configuration configurationUnderTest = getConfiguration();
-        
configurationUnderTest.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, 
false);
-
-        final TestingDispatcherGateway dispatcherGateway =
-                dispatcherGatewayBuilder(jobStatus)
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    fail("Cluster shutdown should not be 
called");
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                })
-                        .build();
-
-        ApplicationDispatcherBootstrap bootstrap =
-                createApplicationDispatcherBootstrap(
-                        configurationUnderTest, dispatcherGateway, 
scheduledExecutor);
-
-        // Wait until bootstrap is finished to make sure cluster shutdown 
isn't called
-        bootstrap.getBootstrapCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-    }
-
-    @Test
-    void testSubmitFailedJobOnApplicationErrorInHASetup() throws Exception {
-        final Configuration configuration = getConfiguration();
-        final JobID jobId = new JobID();
-        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, 
true);
-        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
-        testSubmitFailedJobOnApplicationError(
-                configuration,
-                (id, t) -> {
-                    assertThat(id).isEqualTo(jobId);
-                    assertThat(t)
-                            .isInstanceOf(ProgramInvocationException.class)
-                            .hasRootCauseInstanceOf(RuntimeException.class)
-                            .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE);
-                });
-    }
-
-    @Test
-    void testSubmitFailedJobOnApplicationErrorInHASetupWithCustomFixedJobId() 
throws Exception {
-        final Configuration configuration = getConfiguration();
-        final JobID customFixedJobId = new JobID();
-        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, 
true);
-        configuration.set(
-                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
customFixedJobId.toHexString());
-        testSubmitFailedJobOnApplicationError(
-                configuration,
-                (jobId, t) -> {
-                    assertThat(jobId).isEqualTo(customFixedJobId);
-                    assertThat(t)
-                            .isInstanceOf(ProgramInvocationException.class)
-                            .hasRootCauseInstanceOf(RuntimeException.class)
-                            .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE);
-                });
-    }
-
-    private void testSubmitFailedJobOnApplicationError(
-            Configuration configuration, BiConsumer<JobID, Throwable> 
failedJobAssertion)
-            throws Exception {
-        final CompletableFuture<Void> submitted = new CompletableFuture<>();
-        final TestingDispatcherGateway dispatcherGateway =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFailedFunction(
-                                (jobId, jobName, t) -> {
-                                    try {
-                                        failedJobAssertion.accept(jobId, t);
-                                        submitted.complete(null);
-                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                    } catch (Throwable assertion) {
-                                        
submitted.completeExceptionally(assertion);
-                                        return 
FutureUtils.completedExceptionally(assertion);
-                                    }
-                                })
-                        .setRequestJobStatusFunction(
-                                jobId -> submitted.thenApply(ignored -> 
JobStatus.FAILED))
-                        .setRequestJobResultFunction(
-                                jobId ->
-                                        submitted.thenApply(
-                                                ignored ->
-                                                        createJobResult(jobId, 
JobStatus.FAILED)))
-                        .build();
-
-        final ApplicationDispatcherBootstrap bootstrap =
-                new ApplicationDispatcherBootstrap(
-                        FailingJob.getProgram(),
-                        Collections.emptyList(),
-                        configuration,
-                        dispatcherGateway,
-                        scheduledExecutor,
-                        exception -> {});
-
-        bootstrap.getBootstrapCompletionFuture().get();
-    }
-
-    @Test
-    void testSubmitFailedJobOnApplicationErrorInNonHASetup() throws Exception {
-        final Configuration configuration = getConfiguration();
-        
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, 
true);
-        final ApplicationDispatcherBootstrap bootstrap =
-                new ApplicationDispatcherBootstrap(
-                        FailingJob.getProgram(),
-                        Collections.emptyList(),
-                        configuration,
-                        TestingDispatcherGateway.newBuilder().build(),
-                        scheduledExecutor,
-                        exception -> {});
-        assertThatFuture(bootstrap.getBootstrapCompletionFuture())
-                .eventuallyFailsWith(ExecutionException.class)
-                .extracting(Throwable::getCause)
-                .satisfies(
-                        e ->
-                                assertThat(e)
-                                        
.isInstanceOf(ApplicationExecutionException.class)
-                                        .hasMessageContaining(
-                                                DeploymentOptions
-                                                        
.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
-                                                        .key()));
-    }
-
-    private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() {
-        return dispatcherGatewayBuilder(JobStatus.FINISHED);
-    }
-
-    private TestingDispatcherGateway.Builder failedJobGatewayBuilder() {
-        return dispatcherGatewayBuilder(JobStatus.FAILED);
-    }
-
-    private TestingDispatcherGateway.Builder canceledJobGatewayBuilder() {
-        return dispatcherGatewayBuilder(JobStatus.CANCELED);
-    }
-
-    private TestingDispatcherGateway.Builder runningJobGatewayBuilder() {
-        return dispatcherGatewayBuilder(JobStatus.RUNNING);
-    }
-
-    private TestingDispatcherGateway.Builder 
dispatcherGatewayBuilder(JobStatus jobStatus) {
-        TestingDispatcherGateway.Builder builder =
-                TestingDispatcherGateway.newBuilder()
-                        .setSubmitFunction(
-                                jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
-                        .setRequestJobStatusFunction(
-                                jobId -> 
CompletableFuture.completedFuture(jobStatus));
-        if (jobStatus != JobStatus.RUNNING) {
-            builder.setRequestJobResultFunction(
-                    jobID -> 
CompletableFuture.completedFuture(createJobResult(jobID, jobStatus)));
-        }
-        return builder;
-    }
-
-    private CompletableFuture<Void> runApplication(
-            TestingDispatcherGateway.Builder dispatcherBuilder, int noOfJobs)
-            throws FlinkException {
-
-        return runApplication(dispatcherBuilder, getConfiguration(), noOfJobs);
-    }
-
-    private CompletableFuture<Void> runApplication(
-            final Configuration configuration, final int noOfJobs) throws 
Throwable {
-
-        final TestingDispatcherGateway.Builder dispatcherBuilder = 
finishedJobGatewayBuilder();
-
-        return runApplication(dispatcherBuilder, configuration, noOfJobs);
-    }
-
-    private CompletableFuture<Void> runApplication(
-            TestingDispatcherGateway.Builder dispatcherBuilder,
-            Configuration configuration,
-            int noOfJobs)
-            throws FlinkException {
-
-        final PackagedProgram program = getProgram(noOfJobs);
-
-        final ApplicationDispatcherBootstrap bootstrap =
-                new ApplicationDispatcherBootstrap(
-                        program,
-                        Collections.emptyList(),
-                        configuration,
-                        dispatcherBuilder.build(),
-                        scheduledExecutor,
-                        exception -> {});
-
-        return bootstrap.getApplicationCompletionFuture();
-    }
-
-    private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-            final int noOfJobs,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor)
-            throws FlinkException {
-        return createApplicationDispatcherBootstrap(
-                noOfJobs, dispatcherGateway, scheduledExecutor, exception -> 
{});
-    }
-
-    private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-            final int noOfJobs,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor,
-            final FatalErrorHandler errorHandler)
-            throws FlinkException {
-        return createApplicationDispatcherBootstrap(
-                noOfJobs, getConfiguration(), dispatcherGateway, 
scheduledExecutor, errorHandler);
-    }
-
-    private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-            final Configuration configuration,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor)
-            throws FlinkException {
-        return createApplicationDispatcherBootstrap(
-                1, configuration, dispatcherGateway, scheduledExecutor, 
exception -> {});
-    }
-
-    private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-            final int noOfJobs,
-            final Configuration configuration,
-            final DispatcherGateway dispatcherGateway,
-            final ScheduledExecutor scheduledExecutor,
-            final FatalErrorHandler errorHandler)
-            throws FlinkException {
-        final PackagedProgram program = getProgram(noOfJobs);
-        return new ApplicationDispatcherBootstrap(
-                program,
-                Collections.emptyList(),
-                configuration,
-                dispatcherGateway,
-                scheduledExecutor,
-                errorHandler);
-    }
-
-    private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
-        return MultiExecuteJob.getProgram(noOfJobs, true);
-    }
-
-    private static JobResult createFailedJobResult(final JobID jobId) {
-        return createJobResult(jobId, JobStatus.FAILED);
-    }
-
-    private static JobResult createUnknownJobResult(final JobID jobId) {
-        return createJobResult(jobId, null);
-    }
-
-    private static JobResult createJobResult(
-            final JobID jobID, @Nullable final JobStatus jobStatus) {
-        JobResult.Builder builder =
-                new 
JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
-        if (jobStatus == JobStatus.CANCELED) {
-            builder.serializedThrowable(
-                    new SerializedThrowable(new 
JobCancellationException(jobID, "Hello", null)));
-        } else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
-            builder.serializedThrowable(
-                    new SerializedThrowable(new JobExecutionException(jobID, 
"bla bla bla")));
-        }
-        return builder.build();
-    }
-
-    private static <T, E extends Throwable> E assertException(
-            CompletableFuture<T> future, Class<E> exceptionClass) throws 
Exception {
-
-        try {
-            future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        } catch (Throwable e) {
-            Optional<E> maybeException = ExceptionUtils.findThrowable(e, 
exceptionClass);
-            if (!maybeException.isPresent()) {
-                throw e;
-            }
-            return maybeException.get();
-        }
-        throw new Exception(
-                "Future should have completed exceptionally with "
-                        + exceptionClass.getCanonicalName()
-                        + ".");
-    }
-
-    private Configuration getConfiguration() {
-        final Configuration configuration = new Configuration();
-        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
-        return configuration;
-    }
-}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
new file mode 100644
index 00000000000..958850c9d41
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link ApplicationJobUtils}. */
+public class ApplicationJobUtilsTest {
+
+    private static final String TEST_HA_CLUSTER_ID = "cluster";
+    private static final String TEST_APPLICATION_ID = 
"ca0eb040022fbccd4cf05d1e274ae25e";
+    private static final String TEST_JOB_ID = 
"e79b6d171acd4baa6f421e3631168810";
+
+    private Configuration configuration;
+
+    @BeforeEach
+    void setUp() {
+        configuration = new Configuration();
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideParametersForMaybeFixIds")
+    void testMaybeFixIds(
+            boolean isHAEnabled,
+            boolean isHaClusterIdSet,
+            boolean isApplicationIdSet,
+            boolean isJobIdSet,
+            @Nullable String expectedApplicationId,
+            @Nullable String expectedJobId) {
+        if (isHAEnabled) {
+            configuration.set(
+                    HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        }
+        if (isHaClusterIdSet) {
+            configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, 
TEST_HA_CLUSTER_ID);
+        }
+        if (isApplicationIdSet) {
+            configuration.set(ApplicationOptionsInternal.FIXED_APPLICATION_ID, 
TEST_APPLICATION_ID);
+        }
+        if (isJobIdSet) {
+            configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
TEST_JOB_ID);
+        }
+
+        ApplicationJobUtils.maybeFixIds(configuration);
+
+        assertEquals(
+                expectedApplicationId,
+                
configuration.get(ApplicationOptionsInternal.FIXED_APPLICATION_ID));
+
+        assertEquals(
+                expectedJobId, 
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+    }
+
+    private static Stream<Arguments> provideParametersForMaybeFixIds() {
+        // all combinations for the four input: (isHAEnabled, 
isHaClusterIdSet, isApplicationIdSet,
+        // isJobIdSet)
+        return Stream.of(
+                Arguments.of(false, false, false, false, null, null),
+                Arguments.of(false, false, false, true, null, TEST_JOB_ID),
+                Arguments.of(false, false, true, false, TEST_APPLICATION_ID, 
null),
+                Arguments.of(false, false, true, true, TEST_APPLICATION_ID, 
TEST_JOB_ID),
+                Arguments.of(false, true, false, false, null, null),
+                Arguments.of(false, true, false, true, null, TEST_JOB_ID),
+                Arguments.of(false, true, true, false, TEST_APPLICATION_ID, 
null),
+                Arguments.of(false, true, true, true, TEST_APPLICATION_ID, 
TEST_JOB_ID),
+                Arguments.of(
+                        true,
+                        false,
+                        false,
+                        false,
+                        getAbstractIdFromString(
+                                
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+                        getAbstractIdFromString(
+                                
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue())),
+                Arguments.of(
+                        true,
+                        false,
+                        false,
+                        true,
+                        getAbstractIdFromString(
+                                
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+                        TEST_JOB_ID),
+                Arguments.of(true, false, true, false, TEST_APPLICATION_ID, 
TEST_APPLICATION_ID),
+                Arguments.of(true, false, true, true, TEST_APPLICATION_ID, 
TEST_JOB_ID),
+                Arguments.of(
+                        true,
+                        true,
+                        false,
+                        false,
+                        getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+                        getAbstractIdFromString(TEST_HA_CLUSTER_ID)),
+                Arguments.of(
+                        true,
+                        true,
+                        false,
+                        true,
+                        getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+                        TEST_JOB_ID),
+                Arguments.of(true, true, true, false, TEST_APPLICATION_ID, 
TEST_APPLICATION_ID),
+                Arguments.of(true, true, true, true, TEST_APPLICATION_ID, 
TEST_JOB_ID));
+    }
+
+    private static String getAbstractIdFromString(String str) {
+        return (new AbstractID(str.hashCode(), 0)).toHexString();
+    }
+
+    @Test
+    void testAllowExecuteMultipleJobs_HADisabled_NoFixedJobId() {
+        assertEquals(
+                HighAvailabilityMode.NONE.name(),
+                configuration.get(HighAvailabilityOptions.HA_MODE));
+        
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+
+        
assertTrue(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+    }
+
+    @Test
+    void testAllowExecuteMultipleJobs_HAEnabled_NoFixedJobId() {
+        final String clusterId = "cluster";
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+        
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+
+        
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+    }
+
+    @Test
+    void testAllowExecuteMultipleJobs_HAEnabled_FixedJobIdSet() {
+        final String clusterId = "cluster";
+        final JobID testJobID = new JobID(0, 2);
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+
+        
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+    }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
similarity index 98%
rename from 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
rename to 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
index b5fb78bf94d..03570d12095 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
@@ -69,8 +69,8 @@ import java.util.function.Supplier;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Integration tests related to {@link ApplicationDispatcherBootstrap}. */
-class ApplicationDispatcherBootstrapITCase {
+/** Integration tests related to {@link PackagedProgramApplication}. */
+class PackagedProgramApplicationITCase {
 
     @RegisterExtension
     static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
@@ -238,8 +238,7 @@ class ApplicationDispatcherBootstrapITCase {
             final ArchivedExecutionGraph graph = 
cluster.getArchivedExecutionGraph(jobId).get();
 
             assertThat(graph.getJobID()).isEqualTo(jobId);
-            assertThat(graph.getJobName())
-                    .isEqualTo(ApplicationDispatcherBootstrap.FAILED_JOB_NAME);
+            
assertThat(graph.getJobName()).isEqualTo(PackagedProgramApplication.FAILED_JOB_NAME);
             assertThat(graph.getFailureInfo())
                     .isNotNull()
                     .extracting(ErrorInfo::getException)
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
new file mode 100644
index 00000000000..7191a21e683
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Application options that are not meant to be used by the user. */
+public class ApplicationOptionsInternal {
+    public static final ConfigOption<String> FIXED_APPLICATION_ID =
+            key("$internal.application.id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "**DO NOT USE** The static ApplicationId to be 
used for the application. "
+                                    + "For fault-tolerance, this value needs 
to stay the same across runs.");
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
new file mode 100644
index 00000000000..f5b0ecc9dd4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.util.FlinkException;
+
+public class DuplicateApplicationSubmissionException extends FlinkException {
+
+    private static final long serialVersionUID = 2818087325120827524L;
+
+    private final ApplicationID applicationId;
+
+    public DuplicateApplicationSubmissionException(ApplicationID 
applicationId) {
+        super("Application has already been submitted.");
+        this.applicationId = applicationId;
+    }
+
+    public ApplicationID getApplicationId() {
+        return applicationId;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
new file mode 100644
index 00000000000..f781dedff1e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.application.AbstractApplication;
+
+/**
+ * A {@link DispatcherBootstrap} which wraps an {@link AbstractApplication} 
for execution upon
+ * dispatcher initialization.
+ */
+public class ApplicationBootstrap implements DispatcherBootstrap {
+    private final AbstractApplication application;
+
+    public ApplicationBootstrap(AbstractApplication application) {
+        this.application = application;
+    }
+
+    @Override
+    public void stop() throws Exception {
+        application.cancel();
+    }
+
+    public AbstractApplication getApplication() {
+        return application;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 63e10ec5cb7..596167a0d8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,10 +37,12 @@ import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.application.AbstractApplication;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.client.DuplicateApplicationSubmissionException;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -133,6 +136,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for the Dispatcher component. The Dispatcher component is 
responsible for receiving
@@ -212,6 +216,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
      */
     private final Set<JobID> pendingJobResourceRequirementsUpdates = new 
HashSet<>();
 
+    private final Map<ApplicationID, AbstractApplication> applications = new 
HashMap<>();
+
+    private final Map<ApplicationID, Set<JobID>> recoveredApplicationJobIds = 
new HashMap<>();
+
     /** Enum to distinguish between initial job submission and re-submission 
for recovery. */
     protected enum ExecutionType {
         SUBMISSION,
@@ -366,6 +374,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                         getSelfGateway(DispatcherGateway.class),
                         this.getRpcService().getScheduledExecutor(),
                         this::onFatalError);
+
+        if (dispatcherBootstrap instanceof ApplicationBootstrap) {
+            submitApplication(((ApplicationBootstrap) 
dispatcherBootstrap).getApplication()).get();
+        }
     }
 
     private void startDispatcherServices() throws Exception {
@@ -410,6 +422,13 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     private void runRecoveredJob(final ExecutionPlan recoveredJob) {
         checkNotNull(recoveredJob);
 
+        if (recoveredJob.getApplicationId().isPresent()) {
+            recoveredApplicationJobIds
+                    .computeIfAbsent(
+                            recoveredJob.getApplicationId().get(), ignored -> 
new HashSet<>())
+                    .add(recoveredJob.getJobID());
+        }
+
         initJobClientExpiredTime(recoveredJob);
 
         try (MdcCloseable ignored =
@@ -497,6 +516,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 () -> {
                     dispatcherBootstrap.stop();
 
+                    for (AbstractApplication application : 
applications.values()) {
+                        application.dispose();
+                    }
+
                     stopDispatcherServices();
 
                     log.info("Stopped dispatcher {}.", getAddress());
@@ -579,6 +602,34 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         return archiveExecutionGraphToHistoryServer(executionGraphInfo);
     }
 
+    /** This method must be called from the main thread. */
+    private CompletableFuture<Acknowledge> 
submitApplication(AbstractApplication application) {
+        final ApplicationID applicationId = application.getApplicationId();
+        log.info(
+                "Received application submission '{}' ({}).", 
application.getName(), applicationId);
+
+        if (applications.containsKey(applicationId)) {
+            log.warn("Application with id {} already exists.", applicationId);
+            throw new CompletionException(
+                    new 
DuplicateApplicationSubmissionException(applicationId));
+        }
+        applications.put(applicationId, application);
+        Set<JobID> jobs = recoveredApplicationJobIds.remove(applicationId);
+        if (jobs != null) {
+            jobs.forEach(application::addJob);
+        }
+        return application.execute(
+                getSelfGateway(DispatcherGateway.class),
+                getRpcService().getScheduledExecutor(),
+                getMainThreadExecutor(),
+                this::onFatalError);
+    }
+
+    @VisibleForTesting
+    Map<ApplicationID, AbstractApplication> getApplications() {
+        return applications;
+    }
+
     /**
      * Checks whether the given job has already been executed.
      *
@@ -595,7 +646,28 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
             applyParallelismOverrides((JobGraph) executionPlan);
         }
 
-        log.info("Submitting job '{}' ({}).", executionPlan.getName(), 
executionPlan.getJobID());
+        final JobID jobId = executionPlan.getJobID();
+        final String jobName = executionPlan.getName();
+
+        if (executionPlan.getApplicationId().isPresent()) {
+            ApplicationID applicationId = 
executionPlan.getApplicationId().get();
+            log.info(
+                    "Submitting job '{}' ({}) with associated application 
({}).",
+                    jobName,
+                    jobId,
+                    applicationId);
+            checkState(
+                    applications.containsKey(applicationId),
+                    "Application %s not found.",
+                    applicationId);
+            applications.get(applicationId).addJob(jobId);
+        } else {
+            // TODO update the message after SingleJobApplication is 
implemented
+            // This can occur in two cases:
+            // 1. CLI/REST submissions of jobs without an application
+            // 2. Tests for submitJob that submit jobs without an application
+            log.info("Submitting job '{}' ({}) without associated 
application.", jobName, jobId);
+        }
 
         // track as an outstanding job
         submittedAndWaitingTerminationJobIDs.add(executionPlan.getJobID());
@@ -649,7 +721,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     }
 
     private JobManagerRunner createJobMasterRunner(ExecutionPlan 
executionPlan) throws Exception {
-        
Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
+        
checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
         return jobManagerRunnerFactory.createJobManagerRunner(
                 executionPlan,
                 configuration,
@@ -664,7 +736,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     }
 
     private JobManagerRunner createJobCleanupRunner(JobResult dirtyJobResult) 
throws Exception {
-        
Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(dirtyJobResult.getJobId()));
+        
checkState(!jobManagerRunnerRegistry.isRegistered(dirtyJobResult.getJobId()));
         return cleanupRunnerFactory.create(
                 dirtyJobResult,
                 highAvailabilityServices.getCheckpointRecoveryFactory(),
@@ -684,7 +756,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                         .getResultFuture()
                         .handleAsync(
                                 (jobManagerRunnerResult, throwable) -> {
-                                    Preconditions.checkState(
+                                    checkState(
                                             
jobManagerRunnerRegistry.isRegistered(jobId)
                                                     && 
jobManagerRunnerRegistry.get(jobId)
                                                             == 
jobManagerRunner,
@@ -1277,7 +1349,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     @VisibleForTesting
     void registerJobManagerRunnerTerminationFuture(
             JobID jobId, CompletableFuture<Void> 
jobManagerRunnerTerminationFuture) {
-        
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
+        checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
         jobManagerRunnerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
 
         // clean up the pending termination future
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 551168ea4ef..3506526d1df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
@@ -49,6 +50,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -83,6 +85,9 @@ public class JobGraph implements ExecutionPlan {
     /** ID of this job. May be set if specific job id is desired (e.g. session 
management) */
     private JobID jobID;
 
+    /** ID of the application this job belongs to. */
+    @Nullable private ApplicationID applicationId;
+
     /** Name of this job. */
     private final String jobName;
 
@@ -134,20 +139,21 @@ public class JobGraph implements ExecutionPlan {
      * @param jobName The name of the job.
      */
     public JobGraph(String jobName) {
-        this(null, jobName);
+        this(null, null, jobName);
     }
 
     /**
      * Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
-     * the given name and the given execution configuration (see {@link 
ExecutionConfig}). The
-     * ExecutionConfig will be serialized and can't be modified afterwards.
+     * the given application ID, the given name and the given execution 
configuration (see {@link
+     * ExecutionConfig}). The ExecutionConfig will be serialized and can't be 
modified afterwards.
      *
      * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
      * @param jobName The name of the job.
      */
-    public JobGraph(@Nullable JobID jobId, String jobName) {
+    public JobGraph(@Nullable JobID jobId, @Nullable ApplicationID 
applicationId, String jobName) {
         this.jobID = jobId == null ? new JobID() : jobId;
         this.jobName = jobName == null ? "(unnamed job)" : jobName;
+        this.applicationId = applicationId;
 
         try {
             setExecutionConfig(new ExecutionConfig());
@@ -167,7 +173,7 @@ public class JobGraph implements ExecutionPlan {
      * @param vertices The vertices to add to the graph.
      */
     public JobGraph(@Nullable JobID jobId, String jobName, JobVertex... 
vertices) {
-        this(jobId, jobName);
+        this(jobId, null, jobName);
 
         for (JobVertex vertex : vertices) {
             addVertex(vertex);
@@ -191,6 +197,21 @@ public class JobGraph implements ExecutionPlan {
         this.jobID = jobID;
     }
 
+    /**
+     * Returns the ID of the application this job belongs to.
+     *
+     * @return the ID of the application
+     */
+    @Override
+    public Optional<ApplicationID> getApplicationId() {
+        return Optional.ofNullable(applicationId);
+    }
+
+    /** Sets the ID of the application. */
+    public void setApplicationId(ApplicationID applicationId) {
+        this.applicationId = checkNotNull(applicationId);
+    }
+
     /**
      * Returns the name assigned to the job graph.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
index 7e91e123ca1..05d430ff8de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
@@ -188,7 +188,11 @@ public class AdaptiveGraphManager
                         userClassloader,
                         this);
 
-        this.jobGraph = createAndInitializeJobGraph(streamGraph, 
streamGraph.getJobID());
+        this.jobGraph =
+                createAndInitializeJobGraph(
+                        streamGraph,
+                        streamGraph.getJobID(),
+                        streamGraph.getApplicationId().orElse(null));
 
         this.defaultSlotSharingGroup = new SlotSharingGroup();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
index 8b4216fae18..48e34f049a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
@@ -35,6 +36,7 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * An interface representing a general execution plan, which can be 
implemented by different types
@@ -199,4 +201,11 @@ public interface ExecutionPlan extends Serializable {
      * @return The serialized execution configuration object
      */
     SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
+
+    /**
+     * Gets the unique identifier of the application this job belongs to.
+     *
+     * @return the application id, or empty if not associated with an 
application
+     */
+    Optional<ApplicationID> getApplicationId();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a64d534ba93..349bf0a16cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
@@ -134,6 +135,9 @@ public class StreamGraph implements Pipeline, ExecutionPlan 
{
 
     private JobID jobId;
 
+    /** ID of the application this job belongs to. */
+    @Nullable private ApplicationID applicationId;
+
     private final Configuration jobConfiguration;
     private transient ExecutionConfig executionConfig;
     private final CheckpointConfig checkpointConfig;
@@ -1160,16 +1164,28 @@ public class StreamGraph implements Pipeline, 
ExecutionPlan {
     /** Gets the assembled {@link JobGraph} with a random {@link JobID}. */
     @VisibleForTesting
     public JobGraph getJobGraph() {
-        return getJobGraph(Thread.currentThread().getContextClassLoader(), 
jobId);
+        return getJobGraph(Thread.currentThread().getContextClassLoader(), 
jobId, applicationId);
     }
 
     public JobGraph getJobGraph(ClassLoader userClassLoader) {
-        return getJobGraph(userClassLoader, jobId);
+        return getJobGraph(userClassLoader, jobId, applicationId);
     }
 
     /** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */
-    public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID 
jobID) {
-        return StreamingJobGraphGenerator.createJobGraph(userClassLoader, 
this, jobID);
+    public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID 
jobId) {
+        return getJobGraph(userClassLoader, jobId, applicationId);
+    }
+
+    /**
+     * Gets the assembled {@link JobGraph} with a specified {@link JobID} and 
a specified {@link
+     * ApplicationID}.
+     */
+    public JobGraph getJobGraph(
+            ClassLoader userClassLoader,
+            @Nullable JobID jobId,
+            @Nullable ApplicationID applicationId) {
+        return StreamingJobGraphGenerator.createJobGraph(
+                userClassLoader, this, jobId, applicationId);
     }
 
     public String getStreamingPlanAsJSON() {
@@ -1266,6 +1282,15 @@ public class StreamGraph implements Pipeline, 
ExecutionPlan {
         return jobId;
     }
 
+    public void setApplicationId(ApplicationID applicationId) {
+        this.applicationId = checkNotNull(applicationId);
+    }
+
+    @Override
+    public Optional<ApplicationID> getApplicationId() {
+        return Optional.ofNullable(applicationId);
+    }
+
     /**
      * Sets the classpath required to run the job on a task manager.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ae948dc3852..14ee3fdffc1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -134,12 +135,16 @@ public class StreamingJobGraphGenerator {
                         Thread.currentThread().getContextClassLoader(),
                         streamGraph,
                         streamGraph.getJobID(),
+                        streamGraph.getApplicationId().orElse(null),
                         Runnable::run)
                 .createJobGraph();
     }
 
     public static JobGraph createJobGraph(
-            ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable 
JobID jobID) {
+            ClassLoader userClassLoader,
+            StreamGraph streamGraph,
+            @Nullable JobID jobId,
+            @Nullable ApplicationID applicationId) {
         // TODO Currently, we construct a new thread pool for the compilation 
of each job. In the
         // future, we may refactor the job submission framework and make it 
reusable across jobs.
         final ExecutorService serializationExecutor =
@@ -152,7 +157,11 @@ public class StreamingJobGraphGenerator {
                         new 
ExecutorThreadFactory("flink-operator-serialization-io"));
         try {
             return new StreamingJobGraphGenerator(
-                            userClassLoader, streamGraph, jobID, 
serializationExecutor)
+                            userClassLoader,
+                            streamGraph,
+                            jobId,
+                            applicationId,
+                            serializationExecutor)
                     .createJobGraph();
         } finally {
             serializationExecutor.shutdown();
@@ -177,14 +186,15 @@ public class StreamingJobGraphGenerator {
     private StreamingJobGraphGenerator(
             ClassLoader userClassloader,
             StreamGraph streamGraph,
-            @Nullable JobID jobID,
+            @Nullable JobID jobId,
+            @Nullable ApplicationID applicationId,
             Executor serializationExecutor) {
         this.userClassloader = userClassloader;
         this.streamGraph = streamGraph;
         this.defaultStreamGraphHasher = new StreamGraphHasherV2();
         this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
         this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
-        jobGraph = createAndInitializeJobGraph(streamGraph, jobID);
+        jobGraph = createAndInitializeJobGraph(streamGraph, jobId, 
applicationId);
 
         // Generate deterministic hashes for the nodes in order to identify 
them across
         // submission iff they didn't change.
@@ -886,8 +896,8 @@ public class StreamingJobGraphGenerator {
     }
 
     public static JobGraph createAndInitializeJobGraph(
-            StreamGraph streamGraph, @Nullable JobID jobId) {
-        JobGraph jobGraph = new JobGraph(jobId, streamGraph.getJobName());
+            StreamGraph streamGraph, @Nullable JobID jobId, @Nullable 
ApplicationID applicationId) {
+        JobGraph jobGraph = new JobGraph(jobId, applicationId, 
streamGraph.getJobName());
         jobGraph.setJobType(streamGraph.getJobType());
         jobGraph.setDynamic(streamGraph.isDynamic());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ab25cf02dd0..4e53397a46a 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -28,6 +29,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.application.AbstractApplication;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -101,6 +103,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -137,6 +140,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -935,6 +939,43 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
                 .isZero();
     }
 
+    @Test
+    public void testApplicationBootstrap() throws Exception {
+        final OneShotLatch bootstrapLatch = new OneShotLatch();
+        final ApplicationID applicationId = new ApplicationID();
+        final AbstractApplication application =
+                new TestingApplication(
+                        applicationId,
+                        (ignored -> {
+                            bootstrapLatch.trigger();
+                            return 
CompletableFuture.completedFuture(Acknowledge.get());
+                        }));
+        dispatcher =
+                createTestingDispatcherBuilder()
+                        .setDispatcherBootstrapFactory(
+                                (ignoredDispatcherGateway,
+                                        ignoredScheduledExecutor,
+                                        ignoredFatalErrorHandler) ->
+                                        new ApplicationBootstrap(application))
+                        .build(rpcService);
+
+        dispatcher.start();
+
+        // ensure that the application execution is triggered
+        bootstrapLatch.await();
+
+        assertThat(dispatcher.getApplications().size()).isEqualTo(1);
+        
assertThat(dispatcher.getApplications().keySet()).contains(applicationId);
+
+        jobGraph.setApplicationId(applicationId);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        assertThat(application.getJobs().size()).isEqualTo(1);
+        assertThat(application.getJobs()).contains(jobGraph.getJobID());
+    }
+
     @Test
     public void testPersistedJobGraphWhenDispatcherIsShutDown() throws 
Exception {
         final TestingExecutionPlanStore submittedExecutionPlanStore =
@@ -2104,4 +2145,57 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
             return runner;
         }
     }
+
+    private static class TestingApplication extends AbstractApplication {
+        private final Function<ExecuteParams, CompletableFuture<Acknowledge>> 
executeFunction;
+
+        public TestingApplication(
+                ApplicationID applicationId,
+                Function<ExecuteParams, CompletableFuture<Acknowledge>> 
executeFunction) {
+            super(applicationId);
+            this.executeFunction = executeFunction;
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> execute(
+                DispatcherGateway dispatcherGateway,
+                ScheduledExecutor scheduledExecutor,
+                Executor mainThreadExecutor,
+                FatalErrorHandler errorHandler) {
+
+            ExecuteParams params =
+                    new ExecuteParams(
+                            dispatcherGateway, scheduledExecutor, 
mainThreadExecutor, errorHandler);
+            return executeFunction.apply(params);
+        }
+
+        @Override
+        public void cancel() {}
+
+        @Override
+        public void dispose() {}
+
+        @Override
+        public String getName() {
+            return "TestingApplication";
+        }
+
+        public static class ExecuteParams {
+            public final DispatcherGateway dispatcherGateway;
+            public final ScheduledExecutor scheduledExecutor;
+            public final Executor mainThreadExecutor;
+            public final FatalErrorHandler errorHandler;
+
+            public ExecuteParams(
+                    DispatcherGateway dispatcherGateway,
+                    ScheduledExecutor scheduledExecutor,
+                    Executor mainThreadExecutor,
+                    FatalErrorHandler errorHandler) {
+                this.dispatcherGateway = dispatcherGateway;
+                this.scheduledExecutor = scheduledExecutor;
+                this.mainThreadExecutor = mainThreadExecutor;
+                this.errorHandler = errorHandler;
+            }
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
index 0103c6ec81f..4fe639c6c0d 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
@@ -84,7 +84,8 @@ public class ScriptExecutorITCase extends 
AbstractSqlGatewayStatementITCaseBase
                     miniCluster.getConfiguration(),
                     ScriptExecutor.class.getClassLoader(),
                     false,
-                    false);
+                    false,
+                    null);
 
             executor =
                     new TestScriptExecutor(


Reply via email to