This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6da0b9d97dbe63b8964df0bcb647590b2a238ab Author: Yi Zhang <[email protected]> AuthorDate: Thu Mar 12 14:13:50 2026 +0800 [FLINK-38976][runtime] Support multiple batch jobs in an application --- .../generated/deployment_configuration.html | 6 - .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 4 +- .../java/org/apache/flink/client/ClientUtils.java | 9 +- ...ApplicationDispatcherGatewayServiceFactory.java | 8 +- .../application/ApplicationJobUtils.java | 29 ++++- .../application/PackagedProgramApplication.java | 48 ++++---- .../PackagedProgramApplicationEntry.java | 15 ++- .../client/program/StreamContextEnvironment.java | 52 +++++--- .../application/ApplicationJobUtilsTest.java | 36 +++--- .../PackagedProgramApplicationEntryTest.java | 22 +++- .../PackagedProgramApplicationTest.java | 137 +++++++++++++-------- .../program/StreamContextEnvironmentTest.java | 6 +- .../flink/client/testjar/MultiExecuteJob.java | 14 ++- .../flink/configuration/DeploymentOptions.java | 7 +- .../handlers/JarRunApplicationHandler.java | 3 +- .../service/application/ScriptExecutorITCase.java | 3 +- 16 files changed, 259 insertions(+), 140 deletions(-) diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html index e6790e81506..1823723dc2e 100644 --- a/docs/layouts/shortcodes/generated/deployment_configuration.html +++ b/docs/layouts/shortcodes/generated/deployment_configuration.html @@ -50,12 +50,6 @@ <td>Boolean</td> <td>If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.</td> </tr> - <tr> - <td><h5>execution.submit-failed-job-on-application-error</h5></td> - <td style="word-wrap: break-word;">false</td> - <td>Boolean</td> - <td>If a failed job should be submitted (in the application mode) when there is an error in the application driver before an actual job submission. This is intended for providing a clean way of reporting failures back to the user and is especially useful in combination with 'execution.shutdown-on-application-finish'. This option only works when the single job submission is enforced ('high-availability.type' is enabled). Please note that this is an experimental option and may [...] - </tr> <tr> <td><h5>execution.target</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 index 914108019f3..8c74dd37ded 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 @@ -21,8 +21,8 @@ org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object, org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated -org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID, org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf type org.apache.flink.client.program.JarInfo does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] o [...] -org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID, org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any pa [...] +org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, int, int, boolean, org.apache.flink.api.common.ApplicationID, org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf type org.apache.flink.client.program.JarInfo does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] [...] +org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, int, int, boolean, org.apache.flink.api.common.ApplicationID, org.apache.flink.client.program.JarInfo, java.util.Collection): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any p [...] org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration): Returned leaf type org.apache.flink.configuration.JobManagerOptions$SchedulerType does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 07b75b89d45..1e5b15f3133 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -90,7 +90,8 @@ public enum ClientUtils { executorServiceLoader, configuration, program, - enforceSingleJobExecution, + enforceSingleJobExecution ? 1 : Integer.MAX_VALUE, + enforceSingleJobExecution ? 1 : Integer.MAX_VALUE, suppressSysout, null, null, @@ -101,7 +102,8 @@ public enum ClientUtils { PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, - boolean enforceSingleJobExecution, + int jobCountLimit, + int streamingJobCountLimit, boolean suppressSysout, @Nullable ApplicationID applicationId, @Nullable JarInfo userJarInfo, @@ -121,7 +123,8 @@ public enum ClientUtils { executorServiceLoader, configuration, userCodeClassLoader, - enforceSingleJobExecution, + jobCountLimit, + streamingJobCountLimit, suppressSysout, applicationId, userJarInfo, diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java index 579845cfeeb..4544bbccbdb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobInfo; import org.apache.flink.api.common.JobInfoImpl; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ApplicationOptionsInternal; import org.apache.flink.configuration.Configuration; @@ -105,8 +106,8 @@ public class ApplicationDispatcherGatewayServiceFactory final List<JobInfo> recoveredTerminalJobInfos = getRecoveredTerminalJobInfos(recoveredDirtyJobResults); - final boolean allowExecuteMultipleJobs = - ApplicationJobUtils.allowExecuteMultipleJobs(configuration); + final Tuple2<Integer, Integer> jobCountLimits = + ApplicationJobUtils.getJobCountLimits(configuration); ApplicationJobUtils.maybeFixIds(configuration); final ApplicationID applicationId = configuration @@ -121,8 +122,9 @@ public class ApplicationDispatcherGatewayServiceFactory recoveredJobInfos, recoveredTerminalJobInfos, configuration, + jobCountLimits.f0, + jobCountLimits.f1, true, - !allowExecuteMultipleJobs, configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR), configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH)); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java index bf8f0415390..759e0907ba7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java @@ -20,9 +20,11 @@ package org.apache.flink.client.deployment.application; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ApplicationOptionsInternal; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -125,10 +127,27 @@ public class ApplicationJobUtils { return str; } - public static boolean allowExecuteMultipleJobs(Configuration config) { - final Optional<String> configuredJobId = - config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); - return !HighAvailabilityMode.isHighAvailabilityModeActivated(config) - && configuredJobId.isEmpty(); + /** + * Returns the job count limits for the given configuration. + * + * @param config The configuration to get the job count limits from + * @return A tuple of (total job count limit, streaming job count limit) + */ + public static Tuple2<Integer, Integer> getJobCountLimits(Configuration config) { + if (config.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR)) { + // When enabled, this option ensures that if the application fails before any job is + // submitted, a synthetic failed job is submitted for diagnostics. To provide a stable + // and predictable job ID, only a single job is allowed in the main method. Therefore, + // we enforce single job execution when this option is enabled. + return new Tuple2<>(1, 1); + } + + int jobCountLimit = Integer.MAX_VALUE; + int streamingJobCountLimit = + HighAvailabilityMode.isHighAvailabilityModeActivated(config) + ? 1 + : Integer.MAX_VALUE; + + return new Tuple2<>(jobCountLimit, streamingJobCountLimit); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java index 71f9df60101..423274e908e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java @@ -71,6 +71,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -93,7 +94,9 @@ public class PackagedProgramApplication extends AbstractApplication { private final boolean handleFatalError; - private final boolean enforceSingleJobExecution; + private final int jobCountLimit; + + private final int streamingJobCountLimit; private final boolean submitFailedJobOnApplicationError; @@ -115,8 +118,9 @@ public class PackagedProgramApplication extends AbstractApplication { final ApplicationID applicationId, final PackagedProgram program, final Configuration configuration, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean handleFatalError, - final boolean enforceSingleJobExecution, final boolean submitFailedJobOnApplicationError, final boolean shutDownOnFinish) { this( @@ -125,8 +129,9 @@ public class PackagedProgramApplication extends AbstractApplication { Collections.emptyList(), Collections.emptyList(), configuration, + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, submitFailedJobOnApplicationError, shutDownOnFinish); } @@ -135,8 +140,9 @@ public class PackagedProgramApplication extends AbstractApplication { final ApplicationID applicationId, final PackagedProgram program, final Configuration configuration, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean handleFatalError, - final boolean enforceSingleJobExecution, final boolean submitFailedJobOnApplicationError, final boolean shutDownOnFinish, final @Nullable JarInfo userJarInfo) { @@ -146,8 +152,9 @@ public class PackagedProgramApplication extends AbstractApplication { Collections.emptyList(), Collections.emptyList(), configuration, + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, submitFailedJobOnApplicationError, shutDownOnFinish, userJarInfo); @@ -159,8 +166,9 @@ public class PackagedProgramApplication extends AbstractApplication { final Collection<JobInfo> recoveredJobInfos, final Collection<JobInfo> recoveredTerminalJobInfos, final Configuration configuration, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean handleFatalError, - final boolean enforceSingleJobExecution, final boolean submitFailedJobOnApplicationError, final boolean shutDownOnFinish) { this( @@ -169,8 +177,9 @@ public class PackagedProgramApplication extends AbstractApplication { recoveredJobInfos, recoveredTerminalJobInfos, configuration, + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, submitFailedJobOnApplicationError, shutDownOnFinish, null); @@ -182,8 +191,9 @@ public class PackagedProgramApplication extends AbstractApplication { final Collection<JobInfo> recoveredJobInfos, final Collection<JobInfo> recoveredTerminalJobInfos, final Configuration configuration, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean handleFatalError, - final boolean enforceSingleJobExecution, final boolean submitFailedJobOnApplicationError, final boolean shutDownOnFinish, final @Nullable JarInfo userJarInfo) { @@ -192,8 +202,11 @@ public class PackagedProgramApplication extends AbstractApplication { this.recoveredJobInfos = checkNotNull(recoveredJobInfos); this.recoveredTerminalJobInfos = checkNotNull(recoveredTerminalJobInfos); this.configuration = checkNotNull(configuration); + checkArgument(jobCountLimit > 0, "jobCountLimit must be positive"); + checkArgument(streamingJobCountLimit > 0, "streamingJobCountLimit must be positive"); + this.jobCountLimit = jobCountLimit; + this.streamingJobCountLimit = streamingJobCountLimit; this.handleFatalError = handleFatalError; - this.enforceSingleJobExecution = enforceSingleJobExecution; this.submitFailedJobOnApplicationError = submitFailedJobOnApplicationError; this.shutDownOnFinish = shutDownOnFinish; this.userJarInfo = userJarInfo; @@ -386,8 +399,9 @@ public class PackagedProgramApplication extends AbstractApplication { programDescriptor.getProgramArgs(), getApplicationId(), getName(), + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, submitFailedJobOnApplicationError, shutDownOnFinish)); } @@ -649,15 +663,6 @@ public class PackagedProgramApplication extends AbstractApplication { final Set<JobID> tolerateMissingResult, final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) { - if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) { - jobIdsFuture.completeExceptionally( - new ApplicationExecutionException( - String.format( - "Submission of failed job in case of an application error ('%s') is not supported in non-HA setups.", - DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR - .key()))); - return; - } // applicationJobIds should contain all jobs involved in the current application execution // after ClientUtils.executeProgram completes, including: @@ -691,7 +696,8 @@ public class PackagedProgramApplication extends AbstractApplication { executorServiceLoader, configuration, program, - enforceSingleJobExecution, + jobCountLimit, + streamingJobCountLimit, true /* suppress sysout */, getApplicationId(), userJarInfo, @@ -709,7 +715,7 @@ public class PackagedProgramApplication extends AbstractApplication { // of an already finished a success. final Optional<DuplicateJobSubmissionException> maybeDuplicate = ExceptionUtils.findThrowable(t, DuplicateJobSubmissionException.class); - if (enforceSingleJobExecution + if (jobCountLimit == 1 && maybeDuplicate.isPresent() && maybeDuplicate.get().isGloballyTerminated()) { final JobID jobId = maybeDuplicate.get().getJobID(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java index 931bd8b9d75..f548249ca0e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java @@ -50,9 +50,11 @@ public class PackagedProgramApplicationEntry implements ApplicationStoreEntry { private final String applicationName; - private final boolean handleFatalError; + private final int jobCountLimit; + + private final int streamingJobCountLimit; - private final boolean enforceSingleJobExecution; + private final boolean handleFatalError; private final boolean submitFailedJobOnApplicationError; @@ -65,8 +67,9 @@ public class PackagedProgramApplicationEntry implements ApplicationStoreEntry { String[] programArgs, ApplicationID applicationId, String applicationName, + int jobCountLimit, + int streamingJobCountLimit, boolean handleFatalError, - boolean enforceSingleJobExecution, boolean submitFailedJobOnApplicationError, boolean shutDownOnFinish) { this.configuration = configuration; @@ -75,8 +78,9 @@ public class PackagedProgramApplicationEntry implements ApplicationStoreEntry { this.programArgs = programArgs; this.applicationId = applicationId; this.applicationName = applicationName; + this.jobCountLimit = jobCountLimit; + this.streamingJobCountLimit = streamingJobCountLimit; this.handleFatalError = handleFatalError; - this.enforceSingleJobExecution = enforceSingleJobExecution; this.submitFailedJobOnApplicationError = submitFailedJobOnApplicationError; this.shutDownOnFinish = shutDownOnFinish; } @@ -120,8 +124,9 @@ public class PackagedProgramApplicationEntry implements ApplicationStoreEntry { recoveredJobInfos, recoveredTerminalJobInfos, configuration, + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, submitFailedJobOnApplicationError, shutDownOnFinish, userJarInfo); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java index aeffa48738e..4f51617ce58 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java @@ -33,6 +33,7 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.JobListener; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -71,10 +72,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { private final boolean suppressSysout; - private final boolean enforceSingleJobExecution; private final Configuration clusterConfiguration; - private int jobCounter; + private final int jobCountLimit; + private final int streamingJobCountLimit; + private int jobCount; + private int streamingJobCount; private final boolean programConfigEnabled; private final Collection<String> programConfigWildcards; @@ -96,18 +99,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { configuration, configuration, userCodeClassLoader, - enforceSingleJobExecution, + enforceSingleJobExecution ? 1 : Integer.MAX_VALUE, + enforceSingleJobExecution ? 1 : Integer.MAX_VALUE, suppressSysout, true, Collections.emptyList()); } + @Internal public StreamContextEnvironment( final PipelineExecutorServiceLoader executorServiceLoader, final Configuration clusterConfiguration, final Configuration configuration, final ClassLoader userCodeClassLoader, - final boolean enforceSingleJobExecution, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean suppressSysout, final boolean programConfigEnabled, final Collection<String> programConfigWildcards) { @@ -116,7 +122,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { clusterConfiguration, configuration, userCodeClassLoader, - enforceSingleJobExecution, + jobCountLimit, + streamingJobCountLimit, suppressSysout, programConfigEnabled, programConfigWildcards, @@ -131,7 +138,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { final Configuration clusterConfiguration, final Configuration configuration, final ClassLoader userCodeClassLoader, - final boolean enforceSingleJobExecution, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean suppressSysout, final boolean programConfigEnabled, final Collection<String> programConfigWildcards, @@ -140,9 +148,11 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { final Collection<JobInfo> allRecoveredJobInfos) { super(executorServiceLoader, configuration, userCodeClassLoader); this.suppressSysout = suppressSysout; - this.enforceSingleJobExecution = enforceSingleJobExecution; this.clusterConfiguration = clusterConfiguration; - this.jobCounter = 0; + this.jobCountLimit = jobCountLimit; + this.streamingJobCountLimit = streamingJobCountLimit; + this.jobCount = 0; + this.streamingJobCount = 0; this.programConfigEnabled = programConfigEnabled; this.programConfigWildcards = programConfigWildcards; this.applicationId = applicationId; @@ -242,7 +252,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { @Override public JobClient executeAsync(StreamGraph streamGraph) throws Exception { checkNotAllowedConfigurations(); - validateAllowedExecution(); + validateAllowedExecution(streamGraph); if (applicationId != null) { streamGraph.setApplicationId(applicationId); } @@ -260,12 +270,22 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { return jobClient; } - private void validateAllowedExecution() { - if (enforceSingleJobExecution && jobCounter > 0) { + private void validateAllowedExecution(StreamGraph streamGraph) { + if (streamGraph.getJobType() == JobType.STREAMING) { + streamingJobCount++; + } + jobCount++; + + if (streamingJobCount > streamingJobCountLimit) { + throw new FlinkRuntimeException( + "Cannot have more than " + + streamingJobCountLimit + + " streaming jobs in a single environment."); + } + if (jobCount > jobCountLimit) { throw new FlinkRuntimeException( - "Cannot have more than one execute() or executeAsync() call in a single environment."); + "Cannot have more than " + jobCountLimit + " jobs in a single environment."); } - jobCounter++; } // -------------------------------------------------------------------------------------------- @@ -274,7 +294,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { final PipelineExecutorServiceLoader executorServiceLoader, final Configuration clusterConfiguration, final ClassLoader userCodeClassLoader, - final boolean enforceSingleJobExecution, + final int jobCountLimit, + final int streamingJobCountLimit, final boolean suppressSysout, @Nullable final ApplicationID applicationId, @Nullable final JarInfo userJarInfo, @@ -293,7 +314,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { clusterConfiguration, mergedEnvConfig, userCodeClassLoader, - enforceSingleJobExecution, + jobCountLimit, + streamingJobCountLimit, suppressSysout, programConfigEnabled, programConfigWildcards, diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java index 1eed3295981..c28706c3520 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java @@ -18,10 +18,11 @@ package org.apache.flink.client.deployment.application; -import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ApplicationOptionsInternal; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -38,10 +39,8 @@ import javax.annotation.Nullable; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for {@link ApplicationJobUtils}. */ public class ApplicationJobUtilsTest { @@ -367,33 +366,34 @@ public class ApplicationJobUtilsTest { } @Test - void testAllowExecuteMultipleJobs_HADisabled_NoFixedJobId() { - assertEquals( - HighAvailabilityMode.NONE.name(), - configuration.get(HighAvailabilityOptions.HA_MODE)); - assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); + void testGetJobCountLimits_SubmitFailedJobEnabled() { + configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true); - assertTrue(ApplicationJobUtils.allowExecuteMultipleJobs(configuration)); + assertEquals(new Tuple2<>(1, 1), ApplicationJobUtils.getJobCountLimits(configuration)); } @Test - void testAllowExecuteMultipleJobs_HAEnabled_NoFixedJobId() { - final String clusterId = "cluster"; - configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); - configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + void testGetJobCountLimits_HADisabled() { + assertEquals( + HighAvailabilityMode.NONE.name(), + configuration.get(HighAvailabilityOptions.HA_MODE)); assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); - assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration)); + assertEquals( + new Tuple2<>(Integer.MAX_VALUE, Integer.MAX_VALUE), + ApplicationJobUtils.getJobCountLimits(configuration)); } @Test - void testAllowExecuteMultipleJobs_HAEnabled_FixedJobIdSet() { + void testGetJobCountLimits_HAEnabled() { final String clusterId = "cluster"; - final JobID testJobID = new JobID(0, 2); configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); - configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); - assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration)); + // support multiple batch job execution + assertEquals( + new Tuple2<>(Integer.MAX_VALUE, 1), + ApplicationJobUtils.getJobCountLimits(configuration)); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java index 169c31cee5c..714255cb488 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntryTest.java @@ -91,7 +91,14 @@ class PackagedProgramApplicationEntryTest { void testConstructionWithoutJarBlob() { PackagedProgramApplication application = new PackagedProgramApplication( - applicationId, program, config, true, false, false, true); + applicationId, + program, + config, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + true, + false, + true); ApplicationStoreEntry entry = application.getApplicationStoreEntry().orElse(null); assertNull(entry); @@ -107,7 +114,15 @@ class PackagedProgramApplicationEntryTest { JarInfo userJarInfo = new JarInfo(jarFile.getName(), blobKey); PackagedProgramApplication application = new PackagedProgramApplication( - applicationId, program, config, true, false, false, true, userJarInfo); + applicationId, + program, + config, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + true, + false, + true, + userJarInfo); ApplicationStoreEntry entry = application.getApplicationStoreEntry().orElse(null); assertInstanceOf(PackagedProgramApplicationEntry.class, entry); @@ -178,9 +193,10 @@ class PackagedProgramApplicationEntryTest { programArgs, applicationId, applicationName, + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java index 4f04931b4a4..d4a799cab9e 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java @@ -73,7 +73,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Supplier; -import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -98,13 +97,62 @@ public class PackagedProgramApplicationTest { void testOnlyOneJobIsAllowedWhenEnforceSingleJobExecution() throws Throwable { final PackagedProgramApplication application = createAndExecuteApplication( - 2, getConfiguration(), finishedJobGatewayBuilder().build(), true); + 2, getConfiguration(), finishedJobGatewayBuilder().build(), 1, 1); assertException(application.getApplicationCompletionFuture(), FlinkRuntimeException.class); application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); assertApplicationFailed(application); } + @Test + void testOnlyOneStreamingJobIsAllowedWhenEnforceSingleStreamingJobExecution() throws Throwable { + final PackagedProgramApplication application = + createAndExecuteApplication( + 2, + getConfiguration(), + finishedJobGatewayBuilder().build(), + Integer.MAX_VALUE, + 1); + + assertException(application.getApplicationCompletionFuture(), FlinkRuntimeException.class); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + } + + @Test + void testMultiBatchJobIsAllowedWhenEnableMultiBatchJobExecution() throws Throwable { + final List<JobID> submittedJobIds = new ArrayList<>(); + + final PackagedProgram program = getProgram(2, true); + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + program, + getConfiguration(), + Integer.MAX_VALUE, + 1, + true, + false, + true); + + DispatcherGateway dispatcherGateway = + finishedJobGatewayBuilder() + .setSubmitFunction( + executionPlan -> { + submittedJobIds.add(executionPlan.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + + executeApplication(application, dispatcherGateway, scheduledExecutor, exception -> {}); + + application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + + assertThat(submittedJobIds.size()).isEqualTo(2); + } + @Test void testStaticJobId() throws Throwable { final JobID testJobID = new JobID(0, 2); @@ -180,9 +228,10 @@ public class PackagedProgramApplicationTest { new ApplicationID(), getProgram(2), getConfiguration(), + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); // change mainThreadExecutor to be manually triggered @@ -272,9 +321,10 @@ public class PackagedProgramApplicationTest { new ApplicationID(), getProgram(2), getConfiguration(), + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); // change mainThreadExecutor to be manually triggered @@ -483,9 +533,10 @@ public class PackagedProgramApplicationTest { new ApplicationID(), getProgram(1), getConfiguration(), + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); application.cancel(); @@ -861,7 +912,8 @@ public class PackagedProgramApplicationTest { dispatcherBuilder.build(), scheduledExecutor, exception -> errorHandlerCalled.set(true), - false, + Integer.MAX_VALUE, + Integer.MAX_VALUE, false, true); @@ -896,7 +948,7 @@ public class PackagedProgramApplicationTest { final PackagedProgramApplication application = createAndExecuteApplication( - 1, configurationUnderTest, dispatcherBuilder.build(), true); + 1, configurationUnderTest, dispatcherBuilder.build(), 1, 1); application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -934,7 +986,7 @@ public class PackagedProgramApplicationTest { final PackagedProgramApplication application = createAndExecuteApplication( - 1, configurationUnderTest, dispatcherBuilder.build(), true); + 1, configurationUnderTest, dispatcherBuilder.build(), 1, 1); application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -994,8 +1046,9 @@ public class PackagedProgramApplicationTest { dispatcherGateway, scheduledExecutor, exception -> {}, + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, - false, false); // Wait until application is finished to make sure cluster shutdown isn't called @@ -1050,8 +1103,9 @@ public class PackagedProgramApplicationTest { new ApplicationID(), FailingJob.getProgram(), configuration, + 1, + 1, true, - true /* enforceSingleJobExecution */, true /* submitFailedJobOnApplicationError */, true); @@ -1061,39 +1115,6 @@ public class PackagedProgramApplicationTest { assertApplicationFailed(application); } - @Test - void testSubmitFailedJobOnApplicationErrorWhenNotEnforceSingleJobExecution() throws Exception { - final PackagedProgramApplication application = - new PackagedProgramApplication( - new ApplicationID(), - FailingJob.getProgram(), - getConfiguration(), - true, - false /* enforceSingleJobExecution */, - true /* submitFailedJobOnApplicationError */, - true); - - executeApplication( - application, - TestingDispatcherGateway.newBuilder().build(), - scheduledExecutor, - exception -> {}); - - assertThatFuture(application.getApplicationCompletionFuture()) - .eventuallyFailsWith(ExecutionException.class) - .extracting(Throwable::getCause) - .satisfies( - e -> - assertThat(e) - .hasMessageContaining( - DeploymentOptions - .SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR - .key())); - - application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertApplicationFailed(application); - } - @Test void testExceptionHistoryWhenJobFails() throws Exception { final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); @@ -1193,9 +1214,10 @@ public class PackagedProgramApplicationTest { Collections.emptyList(), Collections.singletonList(recoveredTerminalJobInfo), configuration, + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); executeApplication( @@ -1243,9 +1265,10 @@ public class PackagedProgramApplicationTest { Collections.singletonList(recoveredRunningJobInfo), Collections.emptyList(), configuration, + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, false, - false, true); executeApplication( @@ -1300,14 +1323,16 @@ public class PackagedProgramApplicationTest { final Configuration configuration, final DispatcherGateway dispatcherGateway) throws FlinkException { - return createAndExecuteApplication(numJobs, configuration, dispatcherGateway, false); + return createAndExecuteApplication( + numJobs, configuration, dispatcherGateway, Integer.MAX_VALUE, Integer.MAX_VALUE); } private PackagedProgramApplication createAndExecuteApplication( final int numJobs, final Configuration configuration, final DispatcherGateway dispatcherGateway, - final boolean enforceSingleJobExecution) + final int jobCountLimit, + final int streamingJobCountLimit) throws FlinkException { return createAndExecuteApplication( numJobs, @@ -1315,8 +1340,9 @@ public class PackagedProgramApplicationTest { dispatcherGateway, scheduledExecutor, exception -> {}, + jobCountLimit, + streamingJobCountLimit, true, - enforceSingleJobExecution, true); } @@ -1343,8 +1369,9 @@ public class PackagedProgramApplicationTest { dispatcherGateway, scheduledExecutor, errorHandler, + Integer.MAX_VALUE, + Integer.MAX_VALUE, true, - false, true); } @@ -1354,8 +1381,9 @@ public class PackagedProgramApplicationTest { final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor, final FatalErrorHandler errorHandler, + final int jobCountLimit, + final int streamingJobCountLimit, boolean handleFatalError, - boolean enforceSingleJobExecution, boolean shutDownOnFinish) throws FlinkException { @@ -1366,8 +1394,9 @@ public class PackagedProgramApplicationTest { new ApplicationID(), program, configuration, + jobCountLimit, + streamingJobCountLimit, handleFatalError, - enforceSingleJobExecution, false, shutDownOnFinish); assertApplicationCreated(application); @@ -1394,7 +1423,11 @@ public class PackagedProgramApplicationTest { } private PackagedProgram getProgram(int numJobs) throws FlinkException { - return MultiExecuteJob.getProgram(numJobs, true); + return getProgram(numJobs, false); + } + + private PackagedProgram getProgram(int numJobs, boolean batchMode) throws FlinkException { + return MultiExecuteJob.getProgram(numJobs, true, batchMode); } private static JobResult createFailedJobResult(final JobID jobId) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java index d637e13cd1e..f486f272906 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java @@ -101,7 +101,8 @@ class StreamContextEnvironmentTest { clusterConfig, jobConfig, classLoader, - true, + 1, + 1, true, false, Collections.emptyList()); @@ -190,7 +191,8 @@ class StreamContextEnvironmentTest { clusterConfig, clusterConfig, classLoader, - true, + 1, + 1, true, false, programConfigWildcards); diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java index 0b65361bc05..61ea2840381 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/MultiExecuteJob.java @@ -18,6 +18,7 @@ package org.apache.flink.client.testjar; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -38,7 +39,8 @@ import java.util.List; */ public class MultiExecuteJob { - public static PackagedProgram getProgram(int noOfJobs, boolean attached) throws FlinkException { + public static PackagedProgram getProgram(int noOfJobs, boolean attached, boolean batchMode) + throws FlinkException { try { return PackagedProgram.newBuilder() .setUserClassPaths( @@ -47,7 +49,10 @@ public class MultiExecuteJob { .toURI() .toURL())) .setEntryPointClassName(MultiExecuteJob.class.getName()) - .setArguments(String.valueOf(noOfJobs), Boolean.toString(attached)) + .setArguments( + String.valueOf(noOfJobs), + Boolean.toString(attached), + Boolean.toString(batchMode)) .build(); } catch (ProgramInvocationException | FileNotFoundException | MalformedURLException e) { throw new FlinkException("Could not load the provided entrypoint class.", e); @@ -57,10 +62,15 @@ public class MultiExecuteJob { public static void main(String[] args) throws Exception { int noOfExecutes = Integer.parseInt(args[0]); boolean attached = args.length > 1 && Boolean.parseBoolean(args[1]); + boolean batchMode = args.length > 2 && Boolean.parseBoolean(args[2]); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); for (int i = 0; i < noOfExecutes; i++) { + if (batchMode) { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } + final List<Integer> input = new ArrayList<>(); input.add(1); input.add(2); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java index 2e127c5cf2e..60c3c4ebc21 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java @@ -20,6 +20,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; @@ -91,7 +92,11 @@ public class DeploymentOptions { "Whether a Flink Application cluster should shut down automatically after its application finishes" + " (either successfully or as result of a failure). Has no effect for other deployment modes."); - @Experimental + /** + * @deprecated Check application exceptions instead. + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption<Boolean> SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = ConfigOptions.key("execution.submit-failed-job-on-application-error") .booleanType() diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java index 0ca5b679712..3e42667321e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java @@ -131,8 +131,9 @@ public class JarRunApplicationHandler applicationId, program, effectiveConfiguration, + Integer.MAX_VALUE, + 1, false, - true, false, false, jarInfo); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java index 2771911abc5..906c9799e1d 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java @@ -83,7 +83,8 @@ public class ScriptExecutorITCase extends AbstractSqlGatewayStatementITCaseBase loader, miniCluster.getConfiguration(), ScriptExecutor.class.getClassLoader(), - false, + Integer.MAX_VALUE, + Integer.MAX_VALUE, false, null, null,
