This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6da0b9d97dbe63b8964df0bcb647590b2a238ab
Author: Yi Zhang <[email protected]>
AuthorDate: Thu Mar 12 14:13:50 2026 +0800

    [FLINK-38976][runtime] Support multiple batch jobs in an application
---
 .../generated/deployment_configuration.html        |   6 -
 .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8           |   4 +-
 .../java/org/apache/flink/client/ClientUtils.java  |   9 +-
 ...ApplicationDispatcherGatewayServiceFactory.java |   8 +-
 .../application/ApplicationJobUtils.java           |  29 ++++-
 .../application/PackagedProgramApplication.java    |  48 ++++----
 .../PackagedProgramApplicationEntry.java           |  15 ++-
 .../client/program/StreamContextEnvironment.java   |  52 +++++---
 .../application/ApplicationJobUtilsTest.java       |  36 +++---
 .../PackagedProgramApplicationEntryTest.java       |  22 +++-
 .../PackagedProgramApplicationTest.java            | 137 +++++++++++++--------
 .../program/StreamContextEnvironmentTest.java      |   6 +-
 .../flink/client/testjar/MultiExecuteJob.java      |  14 ++-
 .../flink/configuration/DeploymentOptions.java     |   7 +-
 .../handlers/JarRunApplicationHandler.java         |   3 +-
 .../service/application/ScriptExecutorITCase.java  |   3 +-
 16 files changed, 259 insertions(+), 140 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html 
b/docs/layouts/shortcodes/generated/deployment_configuration.html
index e6790e81506..1823723dc2e 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -50,12 +50,6 @@
             <td>Boolean</td>
             <td>If the job is submitted in attached mode, perform a 
best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in 
response to a user interrupt, such as typing Ctrl + C.</td>
         </tr>
-        <tr>
-            <td><h5>execution.submit-failed-job-on-application-error</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>If a failed job should be submitted (in the application mode) 
when there is an error in the application driver before an actual job 
submission. This is intended for providing a clean way of reporting failures 
back to the user and is especially useful in combination with 
'execution.shutdown-on-application-finish'. This option only works when the 
single job submission is enforced ('high-availability.type' is enabled). Please 
note that this is an experimental option and may  [...]
-        </tr>
         <tr>
             <td><h5>execution.target</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
index 914108019f3..8c74dd37ded 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
@@ -21,8 +21,8 @@ 
org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
 
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object,
 org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument 
leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context 
does not satisfy: reside outside of package 'org.apache.flink..' or reside in 
any package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
-org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean, org.apache.flink.api.common.ApplicationID, 
org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf 
type org.apache.flink.client.program.JarInfo does not satisfy: reside outside 
of package 'org.apache.flink..' or reside in any package ['..shaded..'] o [...]
-org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean, org.apache.flink.api.common.ApplicationID, 
org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf 
type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any pa 
[...]
+org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, int, int, 
boolean, org.apache.flink.api.common.ApplicationID, 
org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf 
type org.apache.flink.client.program.JarInfo does not satisfy: reside outside 
of package 'org.apache.flink..' or reside in any package ['..shaded..']  [...]
+org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, int, int, 
boolean, org.apache.flink.api.common.ApplicationID, 
org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf 
type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any p [...]
 
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned 
leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of 
package 'org.apache.flink..' or reside in any package ['..shaded..'] or 
annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
 
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration):
 Returned leaf type 
org.apache.flink.configuration.JobManagerOptions$SchedulerType does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 07b75b89d45..1e5b15f3133 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -90,7 +90,8 @@ public enum ClientUtils {
                 executorServiceLoader,
                 configuration,
                 program,
-                enforceSingleJobExecution,
+                enforceSingleJobExecution ? 1 : Integer.MAX_VALUE,
+                enforceSingleJobExecution ? 1 : Integer.MAX_VALUE,
                 suppressSysout,
                 null,
                 null,
@@ -101,7 +102,8 @@ public enum ClientUtils {
             PipelineExecutorServiceLoader executorServiceLoader,
             Configuration configuration,
             PackagedProgram program,
-            boolean enforceSingleJobExecution,
+            int jobCountLimit,
+            int streamingJobCountLimit,
             boolean suppressSysout,
             @Nullable ApplicationID applicationId,
             @Nullable JarInfo userJarInfo,
@@ -121,7 +123,8 @@ public enum ClientUtils {
                     executorServiceLoader,
                     configuration,
                     userCodeClassLoader,
-                    enforceSingleJobExecution,
+                    jobCountLimit,
+                    streamingJobCountLimit,
                     suppressSysout,
                     applicationId,
                     userJarInfo,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index 579845cfeeb..4544bbccbdb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobInfo;
 import org.apache.flink.api.common.JobInfoImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ApplicationOptionsInternal;
 import org.apache.flink.configuration.Configuration;
@@ -105,8 +106,8 @@ public class ApplicationDispatcherGatewayServiceFactory
         final List<JobInfo> recoveredTerminalJobInfos =
                 getRecoveredTerminalJobInfos(recoveredDirtyJobResults);
 
-        final boolean allowExecuteMultipleJobs =
-                ApplicationJobUtils.allowExecuteMultipleJobs(configuration);
+        final Tuple2<Integer, Integer> jobCountLimits =
+                ApplicationJobUtils.getJobCountLimits(configuration);
         ApplicationJobUtils.maybeFixIds(configuration);
         final ApplicationID applicationId =
                 configuration
@@ -121,8 +122,9 @@ public class ApplicationDispatcherGatewayServiceFactory
                         recoveredJobInfos,
                         recoveredTerminalJobInfos,
                         configuration,
+                        jobCountLimits.f0,
+                        jobCountLimits.f1,
                         true,
-                        !allowExecuteMultipleJobs,
                         
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR),
                         
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
index bf8f0415390..759e0907ba7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
@@ -20,9 +20,11 @@ package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ApplicationOptionsInternal;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -125,10 +127,27 @@ public class ApplicationJobUtils {
         return str;
     }
 
-    public static boolean allowExecuteMultipleJobs(Configuration config) {
-        final Optional<String> configuredJobId =
-                
config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
-        return !HighAvailabilityMode.isHighAvailabilityModeActivated(config)
-                && configuredJobId.isEmpty();
+    /**
+     * Returns the job count limits for the given configuration.
+     *
+     * @param config The configuration to get the job count limits from
+     * @return A tuple of (total job count limit, streaming job count limit)
+     */
+    public static Tuple2<Integer, Integer> getJobCountLimits(Configuration 
config) {
+        if 
(config.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR)) {
+            // When enabled, this option ensures that if the application fails 
before any job is
+            // submitted, a synthetic failed job is submitted for diagnostics. 
To provide a stable
+            // and predictable job ID, only a single job is allowed in the 
main method. Therefore,
+            // we enforce single job execution when this option is enabled.
+            return new Tuple2<>(1, 1);
+        }
+
+        int jobCountLimit = Integer.MAX_VALUE;
+        int streamingJobCountLimit =
+                HighAvailabilityMode.isHighAvailabilityModeActivated(config)
+                        ? 1
+                        : Integer.MAX_VALUE;
+
+        return new Tuple2<>(jobCountLimit, streamingJobCountLimit);
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index 71f9df60101..423274e908e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -71,6 +71,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -93,7 +94,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
 
     private final boolean handleFatalError;
 
-    private final boolean enforceSingleJobExecution;
+    private final int jobCountLimit;
+
+    private final int streamingJobCountLimit;
 
     private final boolean submitFailedJobOnApplicationError;
 
@@ -115,8 +118,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final ApplicationID applicationId,
             final PackagedProgram program,
             final Configuration configuration,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean handleFatalError,
-            final boolean enforceSingleJobExecution,
             final boolean submitFailedJobOnApplicationError,
             final boolean shutDownOnFinish) {
         this(
@@ -125,8 +129,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                 Collections.emptyList(),
                 Collections.emptyList(),
                 configuration,
+                jobCountLimit,
+                streamingJobCountLimit,
                 handleFatalError,
-                enforceSingleJobExecution,
                 submitFailedJobOnApplicationError,
                 shutDownOnFinish);
     }
@@ -135,8 +140,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final ApplicationID applicationId,
             final PackagedProgram program,
             final Configuration configuration,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean handleFatalError,
-            final boolean enforceSingleJobExecution,
             final boolean submitFailedJobOnApplicationError,
             final boolean shutDownOnFinish,
             final @Nullable JarInfo userJarInfo) {
@@ -146,8 +152,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                 Collections.emptyList(),
                 Collections.emptyList(),
                 configuration,
+                jobCountLimit,
+                streamingJobCountLimit,
                 handleFatalError,
-                enforceSingleJobExecution,
                 submitFailedJobOnApplicationError,
                 shutDownOnFinish,
                 userJarInfo);
@@ -159,8 +166,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final Collection<JobInfo> recoveredJobInfos,
             final Collection<JobInfo> recoveredTerminalJobInfos,
             final Configuration configuration,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean handleFatalError,
-            final boolean enforceSingleJobExecution,
             final boolean submitFailedJobOnApplicationError,
             final boolean shutDownOnFinish) {
         this(
@@ -169,8 +177,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                 recoveredJobInfos,
                 recoveredTerminalJobInfos,
                 configuration,
+                jobCountLimit,
+                streamingJobCountLimit,
                 handleFatalError,
-                enforceSingleJobExecution,
                 submitFailedJobOnApplicationError,
                 shutDownOnFinish,
                 null);
@@ -182,8 +191,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final Collection<JobInfo> recoveredJobInfos,
             final Collection<JobInfo> recoveredTerminalJobInfos,
             final Configuration configuration,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean handleFatalError,
-            final boolean enforceSingleJobExecution,
             final boolean submitFailedJobOnApplicationError,
             final boolean shutDownOnFinish,
             final @Nullable JarInfo userJarInfo) {
@@ -192,8 +202,11 @@ public class PackagedProgramApplication extends 
AbstractApplication {
         this.recoveredJobInfos = checkNotNull(recoveredJobInfos);
         this.recoveredTerminalJobInfos = 
checkNotNull(recoveredTerminalJobInfos);
         this.configuration = checkNotNull(configuration);
+        checkArgument(jobCountLimit > 0, "jobCountLimit must be positive");
+        checkArgument(streamingJobCountLimit > 0, "streamingJobCountLimit must 
be positive");
+        this.jobCountLimit = jobCountLimit;
+        this.streamingJobCountLimit = streamingJobCountLimit;
         this.handleFatalError = handleFatalError;
-        this.enforceSingleJobExecution = enforceSingleJobExecution;
         this.submitFailedJobOnApplicationError = 
submitFailedJobOnApplicationError;
         this.shutDownOnFinish = shutDownOnFinish;
         this.userJarInfo = userJarInfo;
@@ -386,8 +399,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                         programDescriptor.getProgramArgs(),
                         getApplicationId(),
                         getName(),
+                        jobCountLimit,
+                        streamingJobCountLimit,
                         handleFatalError,
-                        enforceSingleJobExecution,
                         submitFailedJobOnApplicationError,
                         shutDownOnFinish));
     }
@@ -649,15 +663,6 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final Set<JobID> tolerateMissingResult,
             final DispatcherGateway dispatcherGateway,
             final ScheduledExecutor scheduledExecutor) {
-        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
-            jobIdsFuture.completeExceptionally(
-                    new ApplicationExecutionException(
-                            String.format(
-                                    "Submission of failed job in case of an 
application error ('%s') is not supported in non-HA setups.",
-                                    
DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
-                                            .key())));
-            return;
-        }
 
         // applicationJobIds should contain all jobs involved in the current 
application execution
         // after ClientUtils.executeProgram completes, including:
@@ -691,7 +696,8 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                     executorServiceLoader,
                     configuration,
                     program,
-                    enforceSingleJobExecution,
+                    jobCountLimit,
+                    streamingJobCountLimit,
                     true /* suppress sysout */,
                     getApplicationId(),
                     userJarInfo,
@@ -709,7 +715,7 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             // of an already finished a success.
             final Optional<DuplicateJobSubmissionException> maybeDuplicate =
                     ExceptionUtils.findThrowable(t, 
DuplicateJobSubmissionException.class);
-            if (enforceSingleJobExecution
+            if (jobCountLimit == 1
                     && maybeDuplicate.isPresent()
                     && maybeDuplicate.get().isGloballyTerminated()) {
                 final JobID jobId = maybeDuplicate.get().getJobID();
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java
index 931bd8b9d75..f548249ca0e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java
@@ -50,9 +50,11 @@ public class PackagedProgramApplicationEntry implements 
ApplicationStoreEntry {
 
     private final String applicationName;
 
-    private final boolean handleFatalError;
+    private final int jobCountLimit;
+
+    private final int streamingJobCountLimit;
 
-    private final boolean enforceSingleJobExecution;
+    private final boolean handleFatalError;
 
     private final boolean submitFailedJobOnApplicationError;
 
@@ -65,8 +67,9 @@ public class PackagedProgramApplicationEntry implements 
ApplicationStoreEntry {
             String[] programArgs,
             ApplicationID applicationId,
             String applicationName,
+            int jobCountLimit,
+            int streamingJobCountLimit,
             boolean handleFatalError,
-            boolean enforceSingleJobExecution,
             boolean submitFailedJobOnApplicationError,
             boolean shutDownOnFinish) {
         this.configuration = configuration;
@@ -75,8 +78,9 @@ public class PackagedProgramApplicationEntry implements 
ApplicationStoreEntry {
         this.programArgs = programArgs;
         this.applicationId = applicationId;
         this.applicationName = applicationName;
+        this.jobCountLimit = jobCountLimit;
+        this.streamingJobCountLimit = streamingJobCountLimit;
         this.handleFatalError = handleFatalError;
-        this.enforceSingleJobExecution = enforceSingleJobExecution;
         this.submitFailedJobOnApplicationError = 
submitFailedJobOnApplicationError;
         this.shutDownOnFinish = shutDownOnFinish;
     }
@@ -120,8 +124,9 @@ public class PackagedProgramApplicationEntry implements 
ApplicationStoreEntry {
                 recoveredJobInfos,
                 recoveredTerminalJobInfos,
                 configuration,
+                jobCountLimit,
+                streamingJobCountLimit,
                 handleFatalError,
-                enforceSingleJobExecution,
                 submitFailedJobOnApplicationError,
                 shutDownOnFinish,
                 userJarInfo);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index aeffa48738e..4f51617ce58 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -71,10 +72,12 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 
     private final boolean suppressSysout;
 
-    private final boolean enforceSingleJobExecution;
     private final Configuration clusterConfiguration;
 
-    private int jobCounter;
+    private final int jobCountLimit;
+    private final int streamingJobCountLimit;
+    private int jobCount;
+    private int streamingJobCount;
 
     private final boolean programConfigEnabled;
     private final Collection<String> programConfigWildcards;
@@ -96,18 +99,21 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                 configuration,
                 configuration,
                 userCodeClassLoader,
-                enforceSingleJobExecution,
+                enforceSingleJobExecution ? 1 : Integer.MAX_VALUE,
+                enforceSingleJobExecution ? 1 : Integer.MAX_VALUE,
                 suppressSysout,
                 true,
                 Collections.emptyList());
     }
 
+    @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration clusterConfiguration,
             final Configuration configuration,
             final ClassLoader userCodeClassLoader,
-            final boolean enforceSingleJobExecution,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean suppressSysout,
             final boolean programConfigEnabled,
             final Collection<String> programConfigWildcards) {
@@ -116,7 +122,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                 clusterConfiguration,
                 configuration,
                 userCodeClassLoader,
-                enforceSingleJobExecution,
+                jobCountLimit,
+                streamingJobCountLimit,
                 suppressSysout,
                 programConfigEnabled,
                 programConfigWildcards,
@@ -131,7 +138,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final Configuration clusterConfiguration,
             final Configuration configuration,
             final ClassLoader userCodeClassLoader,
-            final boolean enforceSingleJobExecution,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean suppressSysout,
             final boolean programConfigEnabled,
             final Collection<String> programConfigWildcards,
@@ -140,9 +148,11 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final Collection<JobInfo> allRecoveredJobInfos) {
         super(executorServiceLoader, configuration, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
-        this.enforceSingleJobExecution = enforceSingleJobExecution;
         this.clusterConfiguration = clusterConfiguration;
-        this.jobCounter = 0;
+        this.jobCountLimit = jobCountLimit;
+        this.streamingJobCountLimit = streamingJobCountLimit;
+        this.jobCount = 0;
+        this.streamingJobCount = 0;
         this.programConfigEnabled = programConfigEnabled;
         this.programConfigWildcards = programConfigWildcards;
         this.applicationId = applicationId;
@@ -242,7 +252,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
     @Override
     public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
         checkNotAllowedConfigurations();
-        validateAllowedExecution();
+        validateAllowedExecution(streamGraph);
         if (applicationId != null) {
             streamGraph.setApplicationId(applicationId);
         }
@@ -260,12 +270,22 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
         return jobClient;
     }
 
-    private void validateAllowedExecution() {
-        if (enforceSingleJobExecution && jobCounter > 0) {
+    private void validateAllowedExecution(StreamGraph streamGraph) {
+        if (streamGraph.getJobType() == JobType.STREAMING) {
+            streamingJobCount++;
+        }
+        jobCount++;
+
+        if (streamingJobCount > streamingJobCountLimit) {
+            throw new FlinkRuntimeException(
+                    "Cannot have more than "
+                            + streamingJobCountLimit
+                            + " streaming jobs in a single environment.");
+        }
+        if (jobCount > jobCountLimit) {
             throw new FlinkRuntimeException(
-                    "Cannot have more than one execute() or executeAsync() 
call in a single environment.");
+                    "Cannot have more than " + jobCountLimit + " jobs in a 
single environment.");
         }
-        jobCounter++;
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -274,7 +294,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration clusterConfiguration,
             final ClassLoader userCodeClassLoader,
-            final boolean enforceSingleJobExecution,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             final boolean suppressSysout,
             @Nullable final ApplicationID applicationId,
             @Nullable final JarInfo userJarInfo,
@@ -293,7 +314,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                             clusterConfiguration,
                             mergedEnvConfig,
                             userCodeClassLoader,
-                            enforceSingleJobExecution,
+                            jobCountLimit,
+                            streamingJobCountLimit,
                             suppressSysout,
                             programConfigEnabled,
                             programConfigWildcards,
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
index 1eed3295981..c28706c3520 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.client.deployment.application;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ApplicationOptionsInternal;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -38,10 +39,8 @@ import javax.annotation.Nullable;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link ApplicationJobUtils}. */
 public class ApplicationJobUtilsTest {
@@ -367,33 +366,34 @@ public class ApplicationJobUtilsTest {
     }
 
     @Test
-    void testAllowExecuteMultipleJobs_HADisabled_NoFixedJobId() {
-        assertEquals(
-                HighAvailabilityMode.NONE.name(),
-                configuration.get(HighAvailabilityOptions.HA_MODE));
-        
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+    void testGetJobCountLimits_SubmitFailedJobEnabled() {
+        
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, 
true);
 
-        
assertTrue(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+        assertEquals(new Tuple2<>(1, 1), 
ApplicationJobUtils.getJobCountLimits(configuration));
     }
 
     @Test
-    void testAllowExecuteMultipleJobs_HAEnabled_NoFixedJobId() {
-        final String clusterId = "cluster";
-        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
-        configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+    void testGetJobCountLimits_HADisabled() {
+        assertEquals(
+                HighAvailabilityMode.NONE.name(),
+                configuration.get(HighAvailabilityOptions.HA_MODE));
         
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
 
-        
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+        assertEquals(
+                new Tuple2<>(Integer.MAX_VALUE, Integer.MAX_VALUE),
+                ApplicationJobUtils.getJobCountLimits(configuration));
     }
 
     @Test
-    void testAllowExecuteMultipleJobs_HAEnabled_FixedJobIdSet() {
+    void testGetJobCountLimits_HAEnabled() {
         final String clusterId = "cluster";
-        final JobID testJobID = new JobID(0, 2);
         configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
         configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
-        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
 
-        
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+        // support multiple batch job execution
+        assertEquals(
+                new Tuple2<>(Integer.MAX_VALUE, 1),
+                ApplicationJobUtils.getJobCountLimits(configuration));
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java
index 169c31cee5c..714255cb488 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java
@@ -91,7 +91,14 @@ class PackagedProgramApplicationEntryTest {
     void testConstructionWithoutJarBlob() {
         PackagedProgramApplication application =
                 new PackagedProgramApplication(
-                        applicationId, program, config, true, false, false, 
true);
+                        applicationId,
+                        program,
+                        config,
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
+                        true,
+                        false,
+                        true);
         ApplicationStoreEntry entry = 
application.getApplicationStoreEntry().orElse(null);
 
         assertNull(entry);
@@ -107,7 +114,15 @@ class PackagedProgramApplicationEntryTest {
         JarInfo userJarInfo = new JarInfo(jarFile.getName(), blobKey);
         PackagedProgramApplication application =
                 new PackagedProgramApplication(
-                        applicationId, program, config, true, false, false, 
true, userJarInfo);
+                        applicationId,
+                        program,
+                        config,
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
+                        true,
+                        false,
+                        true,
+                        userJarInfo);
         ApplicationStoreEntry entry = 
application.getApplicationStoreEntry().orElse(null);
 
         assertInstanceOf(PackagedProgramApplicationEntry.class, entry);
@@ -178,9 +193,10 @@ class PackagedProgramApplicationEntryTest {
                 programArgs,
                 applicationId,
                 applicationName,
+                Integer.MAX_VALUE,
+                Integer.MAX_VALUE,
                 true,
                 false,
-                false,
                 true);
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
index 4f04931b4a4..d4a799cab9e 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
@@ -73,7 +73,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
-import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -98,13 +97,62 @@ public class PackagedProgramApplicationTest {
     void testOnlyOneJobIsAllowedWhenEnforceSingleJobExecution() throws 
Throwable {
         final PackagedProgramApplication application =
                 createAndExecuteApplication(
-                        2, getConfiguration(), 
finishedJobGatewayBuilder().build(), true);
+                        2, getConfiguration(), 
finishedJobGatewayBuilder().build(), 1, 1);
 
         assertException(application.getApplicationCompletionFuture(), 
FlinkRuntimeException.class);
         application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
         assertApplicationFailed(application);
     }
 
+    @Test
+    void 
testOnlyOneStreamingJobIsAllowedWhenEnforceSingleStreamingJobExecution() throws 
Throwable {
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(
+                        2,
+                        getConfiguration(),
+                        finishedJobGatewayBuilder().build(),
+                        Integer.MAX_VALUE,
+                        1);
+
+        assertException(application.getApplicationCompletionFuture(), 
FlinkRuntimeException.class);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+    }
+
+    @Test
+    void testMultiBatchJobIsAllowedWhenEnableMultiBatchJobExecution() throws 
Throwable {
+        final List<JobID> submittedJobIds = new ArrayList<>();
+
+        final PackagedProgram program = getProgram(2, true);
+        final PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        new ApplicationID(),
+                        program,
+                        getConfiguration(),
+                        Integer.MAX_VALUE,
+                        1,
+                        true,
+                        false,
+                        true);
+
+        DispatcherGateway dispatcherGateway =
+                finishedJobGatewayBuilder()
+                        .setSubmitFunction(
+                                executionPlan -> {
+                                    
submittedJobIds.add(executionPlan.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .build();
+
+        executeApplication(application, dispatcherGateway, scheduledExecutor, 
exception -> {});
+
+        application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFinished(application);
+
+        assertThat(submittedJobIds.size()).isEqualTo(2);
+    }
+
     @Test
     void testStaticJobId() throws Throwable {
         final JobID testJobID = new JobID(0, 2);
@@ -180,9 +228,10 @@ public class PackagedProgramApplicationTest {
                         new ApplicationID(),
                         getProgram(2),
                         getConfiguration(),
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
                         false,
-                        false,
                         true);
 
         // change mainThreadExecutor to be manually triggered
@@ -272,9 +321,10 @@ public class PackagedProgramApplicationTest {
                         new ApplicationID(),
                         getProgram(2),
                         getConfiguration(),
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
                         false,
-                        false,
                         true);
 
         // change mainThreadExecutor to be manually triggered
@@ -483,9 +533,10 @@ public class PackagedProgramApplicationTest {
                         new ApplicationID(),
                         getProgram(1),
                         getConfiguration(),
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
                         false,
-                        false,
                         true);
 
         application.cancel();
@@ -861,7 +912,8 @@ public class PackagedProgramApplicationTest {
                         dispatcherBuilder.build(),
                         scheduledExecutor,
                         exception -> errorHandlerCalled.set(true),
-                        false,
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         false,
                         true);
 
@@ -896,7 +948,7 @@ public class PackagedProgramApplicationTest {
 
         final PackagedProgramApplication application =
                 createAndExecuteApplication(
-                        1, configurationUnderTest, dispatcherBuilder.build(), 
true);
+                        1, configurationUnderTest, dispatcherBuilder.build(), 
1, 1);
 
         application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
 
@@ -934,7 +986,7 @@ public class PackagedProgramApplicationTest {
 
         final PackagedProgramApplication application =
                 createAndExecuteApplication(
-                        1, configurationUnderTest, dispatcherBuilder.build(), 
true);
+                        1, configurationUnderTest, dispatcherBuilder.build(), 
1, 1);
 
         application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
 
@@ -994,8 +1046,9 @@ public class PackagedProgramApplicationTest {
                         dispatcherGateway,
                         scheduledExecutor,
                         exception -> {},
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
-                        false,
                         false);
 
         // Wait until application is finished to make sure cluster shutdown 
isn't called
@@ -1050,8 +1103,9 @@ public class PackagedProgramApplicationTest {
                         new ApplicationID(),
                         FailingJob.getProgram(),
                         configuration,
+                        1,
+                        1,
                         true,
-                        true /* enforceSingleJobExecution */,
                         true /* submitFailedJobOnApplicationError */,
                         true);
 
@@ -1061,39 +1115,6 @@ public class PackagedProgramApplicationTest {
         assertApplicationFailed(application);
     }
 
-    @Test
-    void 
testSubmitFailedJobOnApplicationErrorWhenNotEnforceSingleJobExecution() throws 
Exception {
-        final PackagedProgramApplication application =
-                new PackagedProgramApplication(
-                        new ApplicationID(),
-                        FailingJob.getProgram(),
-                        getConfiguration(),
-                        true,
-                        false /* enforceSingleJobExecution */,
-                        true /* submitFailedJobOnApplicationError */,
-                        true);
-
-        executeApplication(
-                application,
-                TestingDispatcherGateway.newBuilder().build(),
-                scheduledExecutor,
-                exception -> {});
-
-        assertThatFuture(application.getApplicationCompletionFuture())
-                .eventuallyFailsWith(ExecutionException.class)
-                .extracting(Throwable::getCause)
-                .satisfies(
-                        e ->
-                                assertThat(e)
-                                        .hasMessageContaining(
-                                                DeploymentOptions
-                                                        
.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
-                                                        .key()));
-
-        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-        assertApplicationFailed(application);
-    }
-
     @Test
     void testExceptionHistoryWhenJobFails() throws Exception {
         final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
@@ -1193,9 +1214,10 @@ public class PackagedProgramApplicationTest {
                         Collections.emptyList(),
                         Collections.singletonList(recoveredTerminalJobInfo),
                         configuration,
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
                         false,
-                        false,
                         true);
 
         executeApplication(
@@ -1243,9 +1265,10 @@ public class PackagedProgramApplicationTest {
                         Collections.singletonList(recoveredRunningJobInfo),
                         Collections.emptyList(),
                         configuration,
+                        Integer.MAX_VALUE,
+                        Integer.MAX_VALUE,
                         true,
                         false,
-                        false,
                         true);
 
         executeApplication(
@@ -1300,14 +1323,16 @@ public class PackagedProgramApplicationTest {
             final Configuration configuration,
             final DispatcherGateway dispatcherGateway)
             throws FlinkException {
-        return createAndExecuteApplication(numJobs, configuration, 
dispatcherGateway, false);
+        return createAndExecuteApplication(
+                numJobs, configuration, dispatcherGateway, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
     }
 
     private PackagedProgramApplication createAndExecuteApplication(
             final int numJobs,
             final Configuration configuration,
             final DispatcherGateway dispatcherGateway,
-            final boolean enforceSingleJobExecution)
+            final int jobCountLimit,
+            final int streamingJobCountLimit)
             throws FlinkException {
         return createAndExecuteApplication(
                 numJobs,
@@ -1315,8 +1340,9 @@ public class PackagedProgramApplicationTest {
                 dispatcherGateway,
                 scheduledExecutor,
                 exception -> {},
+                jobCountLimit,
+                streamingJobCountLimit,
                 true,
-                enforceSingleJobExecution,
                 true);
     }
 
@@ -1343,8 +1369,9 @@ public class PackagedProgramApplicationTest {
                 dispatcherGateway,
                 scheduledExecutor,
                 errorHandler,
+                Integer.MAX_VALUE,
+                Integer.MAX_VALUE,
                 true,
-                false,
                 true);
     }
 
@@ -1354,8 +1381,9 @@ public class PackagedProgramApplicationTest {
             final DispatcherGateway dispatcherGateway,
             final ScheduledExecutor scheduledExecutor,
             final FatalErrorHandler errorHandler,
+            final int jobCountLimit,
+            final int streamingJobCountLimit,
             boolean handleFatalError,
-            boolean enforceSingleJobExecution,
             boolean shutDownOnFinish)
             throws FlinkException {
 
@@ -1366,8 +1394,9 @@ public class PackagedProgramApplicationTest {
                         new ApplicationID(),
                         program,
                         configuration,
+                        jobCountLimit,
+                        streamingJobCountLimit,
                         handleFatalError,
-                        enforceSingleJobExecution,
                         false,
                         shutDownOnFinish);
         assertApplicationCreated(application);
@@ -1394,7 +1423,11 @@ public class PackagedProgramApplicationTest {
     }
 
     private PackagedProgram getProgram(int numJobs) throws FlinkException {
-        return MultiExecuteJob.getProgram(numJobs, true);
+        return getProgram(numJobs, false);
+    }
+
+    private PackagedProgram getProgram(int numJobs, boolean batchMode) throws 
FlinkException {
+        return MultiExecuteJob.getProgram(numJobs, true, batchMode);
     }
 
     private static JobResult createFailedJobResult(final JobID jobId) {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index d637e13cd1e..f486f272906 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -101,7 +101,8 @@ class StreamContextEnvironmentTest {
                         clusterConfig,
                         jobConfig,
                         classLoader,
-                        true,
+                        1,
+                        1,
                         true,
                         false,
                         Collections.emptyList());
@@ -190,7 +191,8 @@ class StreamContextEnvironmentTest {
                 clusterConfig,
                 clusterConfig,
                 classLoader,
-                true,
+                1,
+                1,
                 true,
                 false,
                 programConfigWildcards);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java
 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java
index 0b65361bc05..61ea2840381 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.testjar;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -38,7 +39,8 @@ import java.util.List;
  */
 public class MultiExecuteJob {
 
-    public static PackagedProgram getProgram(int noOfJobs, boolean attached) 
throws FlinkException {
+    public static PackagedProgram getProgram(int noOfJobs, boolean attached, 
boolean batchMode)
+            throws FlinkException {
         try {
             return PackagedProgram.newBuilder()
                     .setUserClassPaths(
@@ -47,7 +49,10 @@ public class MultiExecuteJob {
                                             .toURI()
                                             .toURL()))
                     .setEntryPointClassName(MultiExecuteJob.class.getName())
-                    .setArguments(String.valueOf(noOfJobs), 
Boolean.toString(attached))
+                    .setArguments(
+                            String.valueOf(noOfJobs),
+                            Boolean.toString(attached),
+                            Boolean.toString(batchMode))
                     .build();
         } catch (ProgramInvocationException | FileNotFoundException | 
MalformedURLException e) {
             throw new FlinkException("Could not load the provided entrypoint 
class.", e);
@@ -57,10 +62,15 @@ public class MultiExecuteJob {
     public static void main(String[] args) throws Exception {
         int noOfExecutes = Integer.parseInt(args[0]);
         boolean attached = args.length > 1 && Boolean.parseBoolean(args[1]);
+        boolean batchMode = args.length > 2 && Boolean.parseBoolean(args[2]);
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         for (int i = 0; i < noOfExecutes; i++) {
+            if (batchMode) {
+                env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+            }
+
             final List<Integer> input = new ArrayList<>();
             input.add(1);
             input.add(2);
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index 2e127c5cf2e..60c3c4ebc21 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -20,6 +20,7 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.TextElement;
 
@@ -91,7 +92,11 @@ public class DeploymentOptions {
                             "Whether a Flink Application cluster should shut 
down automatically after its application finishes"
                                     + " (either successfully or as result of a 
failure). Has no effect for other deployment modes.");
 
-    @Experimental
+    /**
+     * @deprecated Check application exceptions instead.
+     */
+    @Deprecated
+    @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
     public static final ConfigOption<Boolean> 
SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR =
             
ConfigOptions.key("execution.submit-failed-job-on-application-error")
                     .booleanType()
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
index 0ca5b679712..3e42667321e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
@@ -131,8 +131,9 @@ public class JarRunApplicationHandler
                                             applicationId,
                                             program,
                                             effectiveConfiguration,
+                                            Integer.MAX_VALUE,
+                                            1,
                                             false,
-                                            true,
                                             false,
                                             false,
                                             jarInfo);
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
index 2771911abc5..906c9799e1d 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
@@ -83,7 +83,8 @@ public class ScriptExecutorITCase extends 
AbstractSqlGatewayStatementITCaseBase
                     loader,
                     miniCluster.getConfiguration(),
                     ScriptExecutor.class.getClassLoader(),
-                    false,
+                    Integer.MAX_VALUE,
+                    Integer.MAX_VALUE,
                     false,
                     null,
                     null,

Reply via email to