This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4690f277efc [FLINK-38759][runtime] Refactor application mode with
PackagedProgramApplication
4690f277efc is described below
commit 4690f277efcd84eafb94e32c72a3af13378ce8de
Author: Yi Zhang <[email protected]>
AuthorDate: Fri Sep 26 16:16:45 2025 +0800
[FLINK-38759][runtime] Refactor application mode with
PackagedProgramApplication
---
.../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 2 +-
.../java/org/apache/flink/client/ClientUtils.java | 23 +-
.../ApplicationDispatcherBootstrap.java | 406 --------
...ApplicationDispatcherGatewayServiceFactory.java | 41 +-
.../application/ApplicationJobUtils.java | 109 +++
.../application/PackagedProgramApplication.java | 3 +-
.../client/program/StreamContextEnvironment.java | 39 +-
.../ApplicationDispatcherBootstrapTest.java | 1023 --------------------
.../application/ApplicationJobUtilsTest.java | 174 ++++
....java => PackagedProgramApplicationITCase.java} | 7 +-
.../configuration/ApplicationOptionsInternal.java | 32 +
.../DuplicateApplicationSubmissionException.java | 38 +
.../runtime/dispatcher/ApplicationBootstrap.java | 42 +
.../flink/runtime/dispatcher/Dispatcher.java | 82 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 31 +-
.../streaming/api/graph/AdaptiveGraphManager.java | 6 +-
.../flink/streaming/api/graph/ExecutionPlan.java | 9 +
.../flink/streaming/api/graph/StreamGraph.java | 33 +-
.../api/graph/StreamingJobGraphGenerator.java | 22 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 94 ++
.../service/application/ScriptExecutorITCase.java | 3 +-
21 files changed, 746 insertions(+), 1473 deletions(-)
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
index 05081f810d9..256be5208ff 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
@@ -21,7 +21,7 @@
org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object,
org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument
leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context
does not satisfy: reside outside of package 'org.apache.flink..' or reside in
any package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph):
Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
-org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean,
boolean): Argument leaf type
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy:
reside outside of package 'org.apache.flink..' or reside in any package
['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or
annotated with @Deprecated
+org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean,
boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy:
reside outside of package 'org.apache.flink..' or reside in any package
['..shaded..'] or annotated with @Public or annotated wit [...]
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned
leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of
package 'org.apache.flink..' or reside in any package ['..shaded..'] or
annotated with @Public or annotated with @PublicEvolving or annotated with
@Deprecated
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration):
Returned leaf type
org.apache.flink.configuration.JobManagerOptions$SchedulerType does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f8554ed3064..dd680883bc5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.client;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.cli.ClientOptions;
@@ -42,6 +43,8 @@ import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
@@ -80,6 +83,23 @@ public enum ClientUtils {
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
+ executeProgram(
+ executorServiceLoader,
+ configuration,
+ program,
+ enforceSingleJobExecution,
+ suppressSysout,
+ null);
+ }
+
+ public static void executeProgram(
+ PipelineExecutorServiceLoader executorServiceLoader,
+ Configuration configuration,
+ PackagedProgram program,
+ boolean enforceSingleJobExecution,
+ boolean suppressSysout,
+ @Nullable ApplicationID applicationId)
+ throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader =
program.getUserCodeClassLoader();
final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
@@ -95,7 +115,8 @@ public enum ClientUtils {
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
- suppressSysout);
+ suppressSysout,
+ applicationId);
// For DataStream v2.
ExecutionContextEnvironment.setAsContext(
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
deleted file mode 100644
index 4290dcd4748..00000000000
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.deployment.application;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.cli.ClientOptions;
-import
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
-import
org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link DispatcherBootstrap} used for running the user's {@code main()} in
"Application Mode"
- * (see FLIP-85).
- *
- * <p>This dispatcher bootstrap submits the recovered {@link JobGraph job
graphs} for re-execution
- * (in case of recovery from a failure), and then submits the remaining jobs
of the application for
- * execution.
- *
- * <p>To achieve this, it works in conjunction with the {@link
EmbeddedExecutor EmbeddedExecutor}
- * which decides if it should submit a job for execution (in case of a new
job) or the job was
- * already recovered and is running.
- */
-@Internal
-public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
-
- @VisibleForTesting static final String FAILED_JOB_NAME = "(application
driver)";
-
- private static final Logger LOG =
LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
-
- private static boolean isCanceledOrFailed(ApplicationStatus
applicationStatus) {
- return applicationStatus == ApplicationStatus.CANCELED
- || applicationStatus == ApplicationStatus.FAILED;
- }
-
- private final PackagedProgram application;
-
- private final Collection<JobID> recoveredJobIds;
-
- private final Configuration configuration;
-
- private final FatalErrorHandler errorHandler;
-
- private final CompletableFuture<Void> applicationCompletionFuture;
-
- private final CompletableFuture<Acknowledge> bootstrapCompletionFuture;
-
- private ScheduledFuture<?> applicationExecutionTask;
-
- public ApplicationDispatcherBootstrap(
- final PackagedProgram application,
- final Collection<JobID> recoveredJobIds,
- final Configuration configuration,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor,
- final FatalErrorHandler errorHandler) {
- this.configuration = checkNotNull(configuration);
- this.recoveredJobIds = checkNotNull(recoveredJobIds);
- this.application = checkNotNull(application);
- this.errorHandler = checkNotNull(errorHandler);
-
- this.applicationCompletionFuture =
- fixJobIdAndRunApplicationAsync(dispatcherGateway,
scheduledExecutor);
-
- this.bootstrapCompletionFuture =
finishBootstrapTasks(dispatcherGateway);
- }
-
- @Override
- public void stop() {
- if (applicationExecutionTask != null) {
- applicationExecutionTask.cancel(true);
- }
-
- if (applicationCompletionFuture != null) {
- applicationCompletionFuture.cancel(true);
- }
- }
-
- @VisibleForTesting
- ScheduledFuture<?> getApplicationExecutionFuture() {
- return applicationExecutionTask;
- }
-
- @VisibleForTesting
- CompletableFuture<Void> getApplicationCompletionFuture() {
- return applicationCompletionFuture;
- }
-
- @VisibleForTesting
- CompletableFuture<Acknowledge> getBootstrapCompletionFuture() {
- return bootstrapCompletionFuture;
- }
-
- /**
- * Logs final application status and invokes error handler in case of
unexpected failures.
- * Optionally shuts down the given dispatcherGateway when the application
completes (either
- * successfully or in case of failure), depending on the corresponding
config option.
- */
- private CompletableFuture<Acknowledge> finishBootstrapTasks(
- final DispatcherGateway dispatcherGateway) {
- final CompletableFuture<Acknowledge> shutdownFuture =
- applicationCompletionFuture
- .handle(
- (ignored, t) -> {
- if (t == null) {
- LOG.info("Application completed
SUCCESSFULLY");
- return finish(
- dispatcherGateway,
ApplicationStatus.SUCCEEDED);
- }
- final Optional<ApplicationStatus>
maybeApplicationStatus =
- extractApplicationStatus(t);
- if (maybeApplicationStatus.isPresent()
- &&
isCanceledOrFailed(maybeApplicationStatus.get())) {
- final ApplicationStatus
applicationStatus =
- maybeApplicationStatus.get();
- LOG.info("Application {}: ",
applicationStatus, t);
- return finish(dispatcherGateway,
applicationStatus);
- }
- if (t instanceof CancellationException) {
- LOG.warn(
- "Application has been
cancelled because the {} is being stopped.",
-
ApplicationDispatcherBootstrap.class
- .getSimpleName());
- return
CompletableFuture.completedFuture(Acknowledge.get());
- }
- LOG.warn("Application failed unexpectedly:
", t);
- return
FutureUtils.<Acknowledge>completedExceptionally(t);
- })
- .thenCompose(Function.identity());
- FutureUtils.handleUncaughtException(shutdownFuture, (t, e) ->
errorHandler.onFatalError(e));
- return shutdownFuture;
- }
-
- private CompletableFuture<Acknowledge> finish(
- DispatcherGateway dispatcherGateway, ApplicationStatus
applicationStatus) {
- boolean shouldShutDownOnFinish =
-
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
- return shouldShutDownOnFinish
- ? dispatcherGateway.shutDownCluster(applicationStatus)
- : CompletableFuture.completedFuture(Acknowledge.get());
- }
-
- private Optional<ApplicationStatus> extractApplicationStatus(Throwable t) {
- final Optional<UnsuccessfulExecutionException> maybeException =
- ExceptionUtils.findThrowable(t,
UnsuccessfulExecutionException.class);
- return maybeException.map(
- exception ->
ApplicationStatus.fromJobStatus(exception.getStatus().orElse(null)));
- }
-
- private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
- final DispatcherGateway dispatcherGateway, final ScheduledExecutor
scheduledExecutor) {
- final Optional<String> configuredJobId =
-
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
- final boolean submitFailedJobOnApplicationError =
-
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
- if
(!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
- && !configuredJobId.isPresent()) {
- return runApplicationAsync(
- dispatcherGateway, scheduledExecutor, false,
submitFailedJobOnApplicationError);
- }
- if (!configuredJobId.isPresent()) {
- // In HA mode, we only support single-execute jobs at the moment.
Here, we manually
- // generate the job id, if not configured, from the cluster id to
keep it consistent
- // across failover.
- configuration.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
- new JobID(
- Preconditions.checkNotNull(
- configuration.get(
-
HighAvailabilityOptions.HA_CLUSTER_ID))
- .hashCode(),
- 0)
- .toHexString());
- }
- return runApplicationAsync(
- dispatcherGateway, scheduledExecutor, true,
submitFailedJobOnApplicationError);
- }
-
- /**
- * Runs the user program entrypoint by scheduling a task on the given
{@code scheduledExecutor}.
- * The returned {@link CompletableFuture} completes when all jobs of the
user application
- * succeeded. if any of them fails, or if job submission fails.
- */
- private CompletableFuture<Void> runApplicationAsync(
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor,
- final boolean enforceSingleJobExecution,
- final boolean submitFailedJobOnApplicationError) {
- final CompletableFuture<List<JobID>> applicationExecutionFuture = new
CompletableFuture<>();
- final Set<JobID> tolerateMissingResult =
Collections.synchronizedSet(new HashSet<>());
-
- // we need to hand in a future as return value because we need to get
those JobIs out
- // from the scheduled task that executes the user program
- applicationExecutionTask =
- scheduledExecutor.schedule(
- () ->
- runApplicationEntryPoint(
- applicationExecutionFuture,
- tolerateMissingResult,
- dispatcherGateway,
- scheduledExecutor,
- enforceSingleJobExecution,
- submitFailedJobOnApplicationError),
- 0L,
- TimeUnit.MILLISECONDS);
-
- return applicationExecutionFuture.thenCompose(
- jobIds ->
- getApplicationResult(
- dispatcherGateway,
- jobIds,
- tolerateMissingResult,
- scheduledExecutor));
- }
-
- /**
- * Runs the user program entrypoint and completes the given {@code
jobIdsFuture} with the {@link
- * JobID JobIDs} of the submitted jobs.
- *
- * <p>This should be executed in a separate thread (or task).
- */
- private void runApplicationEntryPoint(
- final CompletableFuture<List<JobID>> jobIdsFuture,
- final Set<JobID> tolerateMissingResult,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor,
- final boolean enforceSingleJobExecution,
- final boolean submitFailedJobOnApplicationError) {
- if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException(
- String.format(
- "Submission of failed job in case of an
application error ('%s') is not supported in non-HA setups.",
-
DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
- .key())));
- return;
- }
- final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
- try {
- final PipelineExecutorServiceLoader executorServiceLoader =
- new EmbeddedExecutorServiceLoader(
- applicationJobIds, dispatcherGateway,
scheduledExecutor);
-
- ClientUtils.executeProgram(
- executorServiceLoader,
- configuration,
- application,
- enforceSingleJobExecution,
- true /* suppress sysout */);
-
- if (applicationJobIds.isEmpty()) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException(
- "The application contains no execute()
calls."));
- } else {
- jobIdsFuture.complete(applicationJobIds);
- }
- } catch (Throwable t) {
- // If we're running in a single job execution mode, it's safe to
consider re-submission
- // of an already finished a success.
- final Optional<DuplicateJobSubmissionException> maybeDuplicate =
- ExceptionUtils.findThrowable(t,
DuplicateJobSubmissionException.class);
- if (enforceSingleJobExecution
- && maybeDuplicate.isPresent()
- && maybeDuplicate.get().isGloballyTerminated()) {
- final JobID jobId = maybeDuplicate.get().getJobID();
- tolerateMissingResult.add(jobId);
- jobIdsFuture.complete(Collections.singletonList(jobId));
- } else if (submitFailedJobOnApplicationError &&
applicationJobIds.isEmpty()) {
- final JobID failedJobId =
- JobID.fromHexString(
-
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
- dispatcherGateway
- .submitFailedJob(failedJobId, FAILED_JOB_NAME, t)
- .thenAccept(
- ignored ->
- jobIdsFuture.complete(
-
Collections.singletonList(failedJobId)));
- } else {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException("Could not execute
application.", t));
- }
- }
- }
-
- private CompletableFuture<Void> getApplicationResult(
- final DispatcherGateway dispatcherGateway,
- final Collection<JobID> applicationJobIds,
- final Set<JobID> tolerateMissingResult,
- final ScheduledExecutor executor) {
- final List<CompletableFuture<?>> jobResultFutures =
- applicationJobIds.stream()
- .map(
- jobId ->
- unwrapJobResultException(
- getJobResult(
- dispatcherGateway,
- jobId,
- executor,
-
tolerateMissingResult.contains(jobId))))
- .collect(Collectors.toList());
- return FutureUtils.waitForAll(jobResultFutures);
- }
-
- private CompletableFuture<JobResult> getJobResult(
- final DispatcherGateway dispatcherGateway,
- final JobID jobId,
- final ScheduledExecutor scheduledExecutor,
- final boolean tolerateMissingResult) {
- final Duration timeout =
configuration.get(ClientOptions.CLIENT_TIMEOUT);
- final Duration retryPeriod =
configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
- final CompletableFuture<JobResult> jobResultFuture =
- JobStatusPollingUtils.getJobResult(
- dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
- if (tolerateMissingResult) {
- // Return "unknown" job result if dispatcher no longer knows the
actual result.
- return FutureUtils.handleException(
- jobResultFuture,
- FlinkJobNotFoundException.class,
- exception ->
- new JobResult.Builder()
- .jobId(jobId)
- .jobStatus(null)
- .netRuntime(Long.MAX_VALUE)
- .build());
- }
- return jobResultFuture;
- }
-
- /**
- * If the given {@link JobResult} indicates success, this passes through
the {@link JobResult}.
- * Otherwise, this returns a future that is finished exceptionally
(potentially with an
- * exception from the {@link JobResult}).
- */
- private CompletableFuture<JobResult> unwrapJobResultException(
- final CompletableFuture<JobResult> jobResult) {
- return jobResult.thenApply(
- result -> {
- if (result.isSuccess()) {
- return result;
- }
-
- throw new CompletionException(
- UnsuccessfulExecutionException.fromJobResult(
- result,
application.getUserCodeClassLoader()));
- });
- }
-}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index b7e188ef7d4..6c443c4b649 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -19,9 +19,13 @@
package org.apache.flink.client.deployment.application;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.runtime.dispatcher.ApplicationBootstrap;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -51,8 +55,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>It instantiates a {@link
*
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.DispatcherGatewayService
- * DispatcherGatewayService} with an {@link ApplicationDispatcherBootstrap}
containing the user's
- * program.
+ * DispatcherGatewayService} with an {@link ApplicationBootstrap} containing
the user's program.
*/
@Internal
public class ApplicationDispatcherGatewayServiceFactory
@@ -62,7 +65,7 @@ public class ApplicationDispatcherGatewayServiceFactory
private final DispatcherFactory dispatcherFactory;
- private final PackagedProgram application;
+ private final PackagedProgram program;
private final RpcService rpcService;
@@ -71,12 +74,12 @@ public class ApplicationDispatcherGatewayServiceFactory
public ApplicationDispatcherGatewayServiceFactory(
Configuration configuration,
DispatcherFactory dispatcherFactory,
- PackagedProgram application,
+ PackagedProgram program,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) {
this.configuration = configuration;
this.dispatcherFactory = dispatcherFactory;
- this.application = checkNotNull(application);
+ this.program = checkNotNull(program);
this.rpcService = rpcService;
this.partialDispatcherServices = partialDispatcherServices;
}
@@ -91,6 +94,26 @@ public class ApplicationDispatcherGatewayServiceFactory
final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
+ final boolean allowExecuteMultipleJobs =
+ ApplicationJobUtils.allowExecuteMultipleJobs(configuration);
+ ApplicationJobUtils.maybeFixIds(configuration);
+ final ApplicationID applicationId =
+ configuration
+
.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID)
+ .map(ApplicationID::fromHexString)
+ .orElseGet(ApplicationID::new);
+
+ PackagedProgramApplication bootstrapApplication =
+ new PackagedProgramApplication(
+ applicationId,
+ program,
+ recoveredJobIds,
+ configuration,
+ true,
+ !allowExecuteMultipleJobs,
+
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR),
+
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));
+
final Dispatcher dispatcher;
try {
dispatcher =
@@ -100,13 +123,7 @@ public class ApplicationDispatcherGatewayServiceFactory
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor,
errorHandler) ->
- new ApplicationDispatcherBootstrap(
- application,
- recoveredJobIds,
- configuration,
- dispatcherGateway,
- scheduledExecutor,
- errorHandler),
+ new
ApplicationBootstrap(bootstrapApplication),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices,
executionPlanWriter,
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
new file mode 100644
index 00000000000..ac2b565c917
--- /dev/null
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationJobUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+
+/** Utility class to handle application/job related configuration options in
application mode. */
+public class ApplicationJobUtils {
+
+ /**
+ * Ensures deterministic application and job IDs in high availability (HA)
mode.
+ *
+ * <p>In HA mode, fixed IDs are required to maintain state consistency
across JobManager
+ * failovers. This method guarantees that the application ID and job ID
are properly set as
+ * follows:
+ *
+ * <ul>
+ * <li>If no application ID is configured, it generates a fixed one from
the HA cluster ID.
+ * <li>If no job ID is configured, it generates a fixed one based on the
application ID (or
+ * the HA cluster ID if the application ID is also absent).
+ * </ul>
+ *
+ * <p>If HA mode is disabled, this method does nothing; and the system
will assign random
+ * application/job IDs if none is configured.
+ *
+ * @param configuration The configuration the may be updated with fixed IDs
+ */
+ public static void maybeFixIds(Configuration configuration) {
+ if
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
+ final Optional<String> configuredApplicationId =
+
configuration.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID);
+ if (configuredApplicationId.isEmpty()) {
+ // In HA mode, a fixed application id is required to ensure
consistency across
+ // failovers. The application id is derived from the cluster
id.
+ configuration.set(
+ ApplicationOptionsInternal.FIXED_APPLICATION_ID,
+ new ApplicationID(
+ Preconditions.checkNotNull(
+ configuration.get(
+
HighAvailabilityOptions
+
.HA_CLUSTER_ID))
+ .hashCode(),
+ 0)
+ .toHexString());
+ }
+ final Optional<String> configuredJobId =
+
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
+ if (configuredJobId.isEmpty()) {
+ // In HA mode, a fixed job id is required to ensure
consistency across failovers.
+ // The job id is derived as follows:
+ // 1. If application id is configured, use the application id
as the job id.
+ // 2. Otherwise, generate the job id based on the HA cluster
id.
+ // Note that the second case is kept for backward
compatibility and may be removed.
+ if (configuredApplicationId.isPresent()) {
+ ApplicationID applicationId =
+ ApplicationID.fromHexString(
+ configuration.get(
+
ApplicationOptionsInternal.FIXED_APPLICATION_ID));
+ configuration.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
+ applicationId.toHexString());
+ } else {
+ configuration.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
+ new JobID(
+ Preconditions.checkNotNull(
+ configuration.get(
+
HighAvailabilityOptions
+
.HA_CLUSTER_ID))
+ .hashCode(),
+ 0)
+ .toHexString());
+ }
+ }
+ }
+ }
+
+ public static boolean allowExecuteMultipleJobs(Configuration config) {
+ final Optional<String> configuredJobId =
+
config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
+ return !HighAvailabilityMode.isHighAvailabilityModeActivated(config)
+ && configuredJobId.isEmpty();
+ }
+}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index 5cc2f2cc047..de5de97b752 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -533,7 +533,8 @@ public class PackagedProgramApplication extends
AbstractApplication {
configuration,
program,
enforceSingleJobExecution,
- true /* suppress sysout */);
+ true /* suppress sysout */,
+ getApplicationId());
if (applicationJobIds.isEmpty()) {
jobIdsFuture.completeExceptionally(
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 7dde266cf85..2799ab63cb5 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.flink.client.program;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
@@ -42,6 +43,8 @@ import
org.apache.flink.shaded.guava33.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -72,6 +75,8 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
private final boolean programConfigEnabled;
private final Collection<String> programConfigWildcards;
+ @Nullable private final ApplicationID applicationId;
+
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
@@ -89,7 +94,6 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
Collections.emptyList());
}
- @Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration clusterConfiguration,
@@ -99,6 +103,29 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
final boolean suppressSysout,
final boolean programConfigEnabled,
final Collection<String> programConfigWildcards) {
+ this(
+ executorServiceLoader,
+ clusterConfiguration,
+ configuration,
+ userCodeClassLoader,
+ enforceSingleJobExecution,
+ suppressSysout,
+ programConfigEnabled,
+ programConfigWildcards,
+ null);
+ }
+
+ @Internal
+ public StreamContextEnvironment(
+ final PipelineExecutorServiceLoader executorServiceLoader,
+ final Configuration clusterConfiguration,
+ final Configuration configuration,
+ final ClassLoader userCodeClassLoader,
+ final boolean enforceSingleJobExecution,
+ final boolean suppressSysout,
+ final boolean programConfigEnabled,
+ final Collection<String> programConfigWildcards,
+ @Nullable final ApplicationID applicationId) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.suppressSysout = suppressSysout;
this.enforceSingleJobExecution = enforceSingleJobExecution;
@@ -106,6 +133,7 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
this.jobCounter = 0;
this.programConfigEnabled = programConfigEnabled;
this.programConfigWildcards = programConfigWildcards;
+ this.applicationId = applicationId;
}
@Override
@@ -185,6 +213,9 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotAllowedConfigurations();
validateAllowedExecution();
+ if (applicationId != null) {
+ streamGraph.setApplicationId(applicationId);
+ }
final JobClient jobClient = super.executeAsync(streamGraph);
if (!suppressSysout) {
@@ -209,7 +240,8 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
final Configuration clusterConfiguration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
- final boolean suppressSysout) {
+ final boolean suppressSysout,
+ @Nullable final ApplicationID applicationId) {
final StreamExecutionEnvironmentFactory factory =
envInitConfig -> {
final boolean programConfigEnabled =
@@ -227,7 +259,8 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
enforceSingleJobExecution,
suppressSysout,
programConfigEnabled,
- programConfigWildcards);
+ programConfigWildcards,
+ applicationId);
};
initializeContextEnvironment(factory);
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
deleted file mode 100644
index 47c5e13fe7b..00000000000
---
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ /dev/null
@@ -1,1023 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.deployment.application;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.testjar.FailingJob;
-import org.apache.flink.client.testjar.MultiExecuteJob;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-
-import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/** Tests for the {@link ApplicationDispatcherBootstrap}. */
-class ApplicationDispatcherBootstrapTest {
-
- private static final int TIMEOUT_SECONDS = 10;
-
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(4);
- private final ScheduledExecutor scheduledExecutor =
- new ScheduledExecutorServiceAdapter(executor);
-
- @AfterEach
- void cleanup() {
- ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
- }
-
- @Test
- void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()));
-
- final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, 0);
-
- assertException(applicationFuture,
ApplicationExecutionException.class);
- }
-
- @Test
- void testOnlyOneJobIsAllowedWithHa() throws Throwable {
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
-
- final CompletableFuture<Void> applicationFuture =
runApplication(configurationUnderTest, 2);
-
- assertException(applicationFuture, FlinkRuntimeException.class);
- }
-
- @Test
- void testOnlyOneJobAllowedWithStaticJobId() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
-
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
-
- final CompletableFuture<Void> applicationFuture =
runApplication(configurationUnderTest, 2);
-
- assertException(applicationFuture, FlinkRuntimeException.class);
- }
-
- @Test
- void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
-
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
-
- final CompletableFuture<Void> applicationFuture =
runApplication(configurationUnderTest, 2);
-
- assertException(applicationFuture, FlinkRuntimeException.class);
- }
-
- @Test
- void testJobIdDefaultsToClusterIdWithHa() throws Throwable {
- final Configuration configurationUnderTest = getConfiguration();
- final String clusterId = "cluster";
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
- configurationUnderTest.set(HighAvailabilityOptions.HA_CLUSTER_ID,
clusterId);
-
- final CompletableFuture<JobID> submittedJobId = new
CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- finishedJobGatewayBuilder()
- .setSubmitFunction(
- jobGraph -> {
-
submittedJobId.complete(jobGraph.getJobID());
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
- .isEqualTo(new JobID(clusterId.hashCode(), 0L));
- }
-
- @Test
- void testStaticJobId() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
-
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
-
- final CompletableFuture<JobID> submittedJobId = new
CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- finishedJobGatewayBuilder()
- .setSubmitFunction(
- jobGraph -> {
-
submittedJobId.complete(jobGraph.getJobID());
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
- .isEqualTo(new JobID(0L, 2L));
- }
-
- @Test
- void testStaticJobIdWithHa() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
-
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
-
- final CompletableFuture<JobID> submittedJobId = new
CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- finishedJobGatewayBuilder()
- .setSubmitFunction(
- jobGraph -> {
-
submittedJobId.complete(jobGraph.getJobID());
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
-
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
- .isEqualTo(new JobID(0L, 2L));
- }
-
- @Test
- void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
- final ConcurrentLinkedDeque<JobID> submittedJobIds = new
ConcurrentLinkedDeque<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph -> {
- submittedJobIds.add(jobGraph.getJobID());
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
- .setRequestJobStatusFunction(
- jobId -> {
- // we only fail one of the jobs, the first
one, the others will
- // "keep" running
- // indefinitely
- if (jobId.equals(submittedJobIds.peek())) {
- return
CompletableFuture.completedFuture(JobStatus.FAILED);
- }
- // never finish the other jobs
- return
CompletableFuture.completedFuture(JobStatus.RUNNING);
- })
- .setRequestJobResultFunction(
- jobId -> {
- // we only fail one of the jobs, the first
one, the other will
- // "keep" running
- // indefinitely. If we didn't have this
the test would hang
- // forever.
- if (jobId.equals(submittedJobIds.peek())) {
- return
CompletableFuture.completedFuture(
- createFailedJobResult(jobId));
- }
- // never finish the other jobs
- return new CompletableFuture<>();
- });
-
- final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, 2);
- final UnsuccessfulExecutionException exception =
- assertException(applicationFuture,
UnsuccessfulExecutionException.class);
-
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
- }
-
- @Test
- void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
- final TestingDispatcherGateway.Builder dispatcherBuilder =
finishedJobGatewayBuilder();
-
- final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, 3);
-
- // this would block indefinitely if the applications don't finish
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- @Test
- void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
- final CompletableFuture<ApplicationStatus> clusterShutdownStatus =
- new CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- canceledJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- clusterShutdownStatus.complete(status);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3, dispatcherBuilder.build(), scheduledExecutor);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // wait until the bootstrap "thinks" it's done, also makes sure that
we don't
- // fail the future exceptionally with a JobCancelledException
- completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
- .isEqualTo(ApplicationStatus.CANCELED);
- }
-
- @Test
- void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception
{
- final TestingDispatcherGateway.Builder dispatcherBuilder =
finishedJobGatewayBuilder();
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3, dispatcherBuilder.build(), scheduledExecutor);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- ScheduledFuture<?> applicationExecutionFuture =
bootstrap.getApplicationExecutionFuture();
-
- // wait until the bootstrap "thinks" it's done
- completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // make sure the task finishes
- applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- @Test
- void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
- final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- runningJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- shutdownCalled.set(true);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =
- new ManuallyTriggeredScheduledExecutor();
- // we're "listening" on this to be completed to verify that the error
handler is called.
- // In production, this will shut down the cluster with an exception.
- final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
- final ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3,
- dispatcherBuilder.build(),
- manuallyTriggeredExecutor,
- errorHandlerFuture::completeExceptionally);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- ScheduledFuture<?> applicationExecutionFuture =
bootstrap.getApplicationExecutionFuture();
-
- bootstrap.stop();
-
- // Triggers the scheduled ApplicationDispatcherBootstrap process after
calling stop. This
- // ensures that the bootstrap task isn't completed before the stop
method is called which
- // would prevent the stop call from cancelling the task's future.
- manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask();
-
- // we didn't call the error handler
- assertThat(errorHandlerFuture.isDone()).isFalse();
-
- // completion future gets completed normally
- completionFuture.get();
-
- // verify that we didn't shut down the cluster
- assertThat(shutdownCalled.get()).isFalse();
-
- // verify that the application task is being cancelled
- assertThat(applicationExecutionFuture.isCancelled()).isTrue();
- assertThat(applicationExecutionFuture.isDone()).isTrue();
- }
-
- @Test
- void testErrorHandlerIsCalledWhenSubmissionThrowsAnException() throws
Exception {
- final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- runningJobGatewayBuilder()
- .setSubmitFunction(
- jobGraph -> {
- throw new FlinkRuntimeException("Nope!");
- })
- .setClusterShutdownFunction(
- status -> {
- shutdownCalled.set(true);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- // we're "listening" on this to be completed to verify that the error
handler is called.
- // In production, this will shut down the cluster with an exception.
- final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
- final ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 2,
- dispatcherBuilder.build(),
- scheduledExecutor,
- errorHandlerFuture::completeExceptionally);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // we call the error handler
- assertException(errorHandlerFuture, FlinkRuntimeException.class);
-
- // we return a future that is completed exceptionally
- assertException(completionFuture, FlinkRuntimeException.class);
-
- // and cluster shutdown didn't get called
- assertThat(shutdownCalled.get()).isFalse();
- }
-
- @Test
- void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws
Exception {
- testErrorHandlerIsCalled(
- () ->
- FutureUtils.completedExceptionally(
- new FlinkRuntimeException("Test exception.")));
- }
-
- @Test
- void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws
Exception {
- testErrorHandlerIsCalled(
- () -> {
- throw new FlinkRuntimeException("Test exception.");
- });
- }
-
- private void
testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>>
shutdownFunction)
- throws Exception {
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
- .setRequestJobStatusFunction(
- jobId ->
CompletableFuture.completedFuture(JobStatus.FINISHED))
- .setRequestJobResultFunction(
- jobId ->
- CompletableFuture.completedFuture(
- createJobResult(jobId,
JobStatus.FINISHED)))
- .setClusterShutdownFunction(status ->
shutdownFunction.get());
-
- // we're "listening" on this to be completed to verify that the error
handler is called.
- // In production, this will shut down the cluster with an exception.
- final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
- final TestingDispatcherGateway dispatcherGateway =
dispatcherBuilder.build();
- final ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3,
- dispatcherGateway,
- scheduledExecutor,
- errorHandlerFuture::completeExceptionally);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // we call the error handler
- assertException(errorHandlerFuture, FlinkRuntimeException.class);
-
- // we return a future that is completed exceptionally
- assertException(completionFuture, FlinkRuntimeException.class);
- }
-
- @Test
- void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws
Exception {
- final CompletableFuture<ApplicationStatus> clusterShutdown = new
CompletableFuture<>();
-
- final TestingDispatcherGateway dispatcherGateway =
- canceledJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- clusterShutdown.complete(status);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
- .build();
-
- final PackagedProgram program = getProgram(2);
-
- final Configuration configuration = getConfiguration();
- configuration.set(DeploymentOptions.ATTACHED, true);
-
- final ApplicationDispatcherBootstrap bootstrap =
- new ApplicationDispatcherBootstrap(
- program,
- Collections.emptyList(),
- configuration,
- dispatcherGateway,
- scheduledExecutor,
- e -> {});
-
- final CompletableFuture<Void> applicationFuture =
- bootstrap.getApplicationCompletionFuture();
- assertException(applicationFuture,
UnsuccessfulExecutionException.class);
-
-
assertThat(clusterShutdown.get()).isEqualTo(ApplicationStatus.CANCELED);
- }
-
- @Test
- void testClusterShutdownWhenApplicationSucceeds() throws Exception {
- // we're "listening" on this to be completed to verify that the cluster
- // is being shut down from the ApplicationDispatcherBootstrap
- final CompletableFuture<ApplicationStatus> externalShutdownFuture =
- new CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- finishedJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- externalShutdownFuture.complete(status);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3, dispatcherBuilder.build(), scheduledExecutor);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // wait until the bootstrap "thinks" it's done
- completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // verify that the dispatcher is actually being shut down
- assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
- .isEqualTo(ApplicationStatus.SUCCEEDED);
- }
-
- @Test
- void testClusterShutdownWhenApplicationFails() throws Exception {
- // we're "listening" on this to be completed to verify that the cluster
- // is being shut down from the ApplicationDispatcherBootstrap
- final CompletableFuture<ApplicationStatus> externalShutdownFuture =
- new CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- failedJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- externalShutdownFuture.complete(status);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3, dispatcherBuilder.build(), scheduledExecutor);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // wait until the bootstrap "thinks" it's done
- completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // verify that the dispatcher is actually being shut down
- assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
- .isEqualTo(ApplicationStatus.FAILED);
- }
-
- @Test
- void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
- // we're "listening" on this to be completed to verify that the cluster
- // is being shut down from the ApplicationDispatcherBootstrap
- final CompletableFuture<ApplicationStatus> externalShutdownFuture =
- new CompletableFuture<>();
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- canceledJobGatewayBuilder()
- .setClusterShutdownFunction(
- status -> {
- externalShutdownFuture.complete(status);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3, dispatcherBuilder.build(), scheduledExecutor);
-
- final CompletableFuture<Acknowledge> completionFuture =
- bootstrap.getBootstrapCompletionFuture();
-
- // wait until the bootstrap "thinks" it's done
- completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // verify that the dispatcher is actually being shut down
- assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
- .isEqualTo(ApplicationStatus.CANCELED);
- }
-
- @Test
- void testErrorHandlerIsCalledWhenApplicationStatusIsUnknown() throws
Exception {
- // we're "listening" on this to be completed to verify that the cluster
- // is being shut down from the ApplicationDispatcherBootstrap
- final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- canceledJobGatewayBuilder()
- .setRequestJobResultFunction(
- jobID ->
- CompletableFuture.completedFuture(
- createUnknownJobResult(jobID)))
- .setClusterShutdownFunction(
- status -> {
- shutdownCalled.set(true);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- });
-
- final TestingDispatcherGateway dispatcherGateway =
dispatcherBuilder.build();
- final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
- final ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- 3,
- dispatcherGateway,
- scheduledExecutor,
- errorHandlerFuture::completeExceptionally);
-
- // check that bootstrap shutdown completes exceptionally
- assertException(
- bootstrap.getApplicationCompletionFuture(),
UnsuccessfulExecutionException.class);
- // and exception gets propagated to error handler
- assertException(
- bootstrap.getApplicationCompletionFuture(),
UnsuccessfulExecutionException.class);
- // and cluster didn't shut down
- assertThat(shutdownCalled.get()).isFalse();
- }
-
- @Test
- void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- finishedJobGatewayBuilder()
- .setSubmitFunction(
- jobGraph ->
- FutureUtils.completedExceptionally(
- DuplicateJobSubmissionException
-
.ofGloballyTerminated(testJobID)));
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- /**
- * In this scenario, job result is no longer present in the {@link
- * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has
terminated and job
- * manager failed over), but we know that job has already terminated from
{@link
- * org.apache.flink.runtime.highavailability.JobResultStore}.
- */
- @Test
- void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult()
throws Throwable {
- final JobID testJobID = new JobID(0, 2);
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
- FutureUtils.completedExceptionally(
- DuplicateJobSubmissionException
-
.ofGloballyTerminated(testJobID)))
- .setRequestJobStatusFunction(
- jobId ->
- FutureUtils.completedExceptionally(
- new
FlinkJobNotFoundException(jobId)))
- .setRequestJobResultFunction(
- jobId ->
- FutureUtils.completedExceptionally(
- new
FlinkJobNotFoundException(jobId)));
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- /**
- * In this scenario, job result is no longer present in the {@link
- * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has
terminated and job
- * manager failed over), but we know that job has already terminated from
{@link
- * org.apache.flink.runtime.highavailability.JobResultStore}.
- */
- @Test
- void
testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttached() throws
Throwable {
- final JobID testJobID = new JobID(0, 2);
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
- FutureUtils.completedExceptionally(
- DuplicateJobSubmissionException
-
.ofGloballyTerminated(testJobID)))
- .setRequestJobStatusFunction(
- jobId ->
- FutureUtils.completedExceptionally(
- new
FlinkJobNotFoundException(jobId)))
- .setRequestJobResultFunction(
- jobId ->
- FutureUtils.completedExceptionally(
- new
FlinkJobNotFoundException(jobId)));
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
- applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- @Test
- void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
- final JobID testJobID = new JobID(0, 2);
- final Configuration configurationUnderTest = getConfiguration();
- configurationUnderTest.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
- configurationUnderTest.set(
- HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
- final TestingDispatcherGateway.Builder dispatcherBuilder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
- FutureUtils.completedExceptionally(
-
DuplicateJobSubmissionException.of(testJobID)));
- final CompletableFuture<Void> applicationFuture =
- runApplication(dispatcherBuilder, configurationUnderTest, 1);
- final ExecutionException executionException =
- assertThrows(
- ExecutionException.class,
- () -> applicationFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
- final Optional<DuplicateJobSubmissionException> maybeDuplicate =
- ExceptionUtils.findThrowable(
- executionException,
DuplicateJobSubmissionException.class);
- assertThat(maybeDuplicate).isPresent();
- assertThat(maybeDuplicate.get().isGloballyTerminated()).isFalse();
- }
-
- @ParameterizedTest
- @EnumSource(
- value = JobStatus.class,
- names = {"FINISHED", "CANCELED", "FAILED"})
- void testShutdownDisabled(JobStatus jobStatus) throws Exception {
- final Configuration configurationUnderTest = getConfiguration();
-
configurationUnderTest.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH,
false);
-
- final TestingDispatcherGateway dispatcherGateway =
- dispatcherGatewayBuilder(jobStatus)
- .setClusterShutdownFunction(
- status -> {
- fail("Cluster shutdown should not be
called");
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
- .build();
-
- ApplicationDispatcherBootstrap bootstrap =
- createApplicationDispatcherBootstrap(
- configurationUnderTest, dispatcherGateway,
scheduledExecutor);
-
- // Wait until bootstrap is finished to make sure cluster shutdown
isn't called
- bootstrap.getBootstrapCompletionFuture().get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
- }
-
- @Test
- void testSubmitFailedJobOnApplicationErrorInHASetup() throws Exception {
- final Configuration configuration = getConfiguration();
- final JobID jobId = new JobID();
- configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
-
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR,
true);
- configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
jobId.toHexString());
- testSubmitFailedJobOnApplicationError(
- configuration,
- (id, t) -> {
- assertThat(id).isEqualTo(jobId);
- assertThat(t)
- .isInstanceOf(ProgramInvocationException.class)
- .hasRootCauseInstanceOf(RuntimeException.class)
- .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE);
- });
- }
-
- @Test
- void testSubmitFailedJobOnApplicationErrorInHASetupWithCustomFixedJobId()
throws Exception {
- final Configuration configuration = getConfiguration();
- final JobID customFixedJobId = new JobID();
- configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
-
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR,
true);
- configuration.set(
- PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
customFixedJobId.toHexString());
- testSubmitFailedJobOnApplicationError(
- configuration,
- (jobId, t) -> {
- assertThat(jobId).isEqualTo(customFixedJobId);
- assertThat(t)
- .isInstanceOf(ProgramInvocationException.class)
- .hasRootCauseInstanceOf(RuntimeException.class)
- .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE);
- });
- }
-
- private void testSubmitFailedJobOnApplicationError(
- Configuration configuration, BiConsumer<JobID, Throwable>
failedJobAssertion)
- throws Exception {
- final CompletableFuture<Void> submitted = new CompletableFuture<>();
- final TestingDispatcherGateway dispatcherGateway =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFailedFunction(
- (jobId, jobName, t) -> {
- try {
- failedJobAssertion.accept(jobId, t);
- submitted.complete(null);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- } catch (Throwable assertion) {
-
submitted.completeExceptionally(assertion);
- return
FutureUtils.completedExceptionally(assertion);
- }
- })
- .setRequestJobStatusFunction(
- jobId -> submitted.thenApply(ignored ->
JobStatus.FAILED))
- .setRequestJobResultFunction(
- jobId ->
- submitted.thenApply(
- ignored ->
- createJobResult(jobId,
JobStatus.FAILED)))
- .build();
-
- final ApplicationDispatcherBootstrap bootstrap =
- new ApplicationDispatcherBootstrap(
- FailingJob.getProgram(),
- Collections.emptyList(),
- configuration,
- dispatcherGateway,
- scheduledExecutor,
- exception -> {});
-
- bootstrap.getBootstrapCompletionFuture().get();
- }
-
- @Test
- void testSubmitFailedJobOnApplicationErrorInNonHASetup() throws Exception {
- final Configuration configuration = getConfiguration();
-
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR,
true);
- final ApplicationDispatcherBootstrap bootstrap =
- new ApplicationDispatcherBootstrap(
- FailingJob.getProgram(),
- Collections.emptyList(),
- configuration,
- TestingDispatcherGateway.newBuilder().build(),
- scheduledExecutor,
- exception -> {});
- assertThatFuture(bootstrap.getBootstrapCompletionFuture())
- .eventuallyFailsWith(ExecutionException.class)
- .extracting(Throwable::getCause)
- .satisfies(
- e ->
- assertThat(e)
-
.isInstanceOf(ApplicationExecutionException.class)
- .hasMessageContaining(
- DeploymentOptions
-
.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
- .key()));
- }
-
- private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() {
- return dispatcherGatewayBuilder(JobStatus.FINISHED);
- }
-
- private TestingDispatcherGateway.Builder failedJobGatewayBuilder() {
- return dispatcherGatewayBuilder(JobStatus.FAILED);
- }
-
- private TestingDispatcherGateway.Builder canceledJobGatewayBuilder() {
- return dispatcherGatewayBuilder(JobStatus.CANCELED);
- }
-
- private TestingDispatcherGateway.Builder runningJobGatewayBuilder() {
- return dispatcherGatewayBuilder(JobStatus.RUNNING);
- }
-
- private TestingDispatcherGateway.Builder
dispatcherGatewayBuilder(JobStatus jobStatus) {
- TestingDispatcherGateway.Builder builder =
- TestingDispatcherGateway.newBuilder()
- .setSubmitFunction(
- jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
- .setRequestJobStatusFunction(
- jobId ->
CompletableFuture.completedFuture(jobStatus));
- if (jobStatus != JobStatus.RUNNING) {
- builder.setRequestJobResultFunction(
- jobID ->
CompletableFuture.completedFuture(createJobResult(jobID, jobStatus)));
- }
- return builder;
- }
-
- private CompletableFuture<Void> runApplication(
- TestingDispatcherGateway.Builder dispatcherBuilder, int noOfJobs)
- throws FlinkException {
-
- return runApplication(dispatcherBuilder, getConfiguration(), noOfJobs);
- }
-
- private CompletableFuture<Void> runApplication(
- final Configuration configuration, final int noOfJobs) throws
Throwable {
-
- final TestingDispatcherGateway.Builder dispatcherBuilder =
finishedJobGatewayBuilder();
-
- return runApplication(dispatcherBuilder, configuration, noOfJobs);
- }
-
- private CompletableFuture<Void> runApplication(
- TestingDispatcherGateway.Builder dispatcherBuilder,
- Configuration configuration,
- int noOfJobs)
- throws FlinkException {
-
- final PackagedProgram program = getProgram(noOfJobs);
-
- final ApplicationDispatcherBootstrap bootstrap =
- new ApplicationDispatcherBootstrap(
- program,
- Collections.emptyList(),
- configuration,
- dispatcherBuilder.build(),
- scheduledExecutor,
- exception -> {});
-
- return bootstrap.getApplicationCompletionFuture();
- }
-
- private ApplicationDispatcherBootstrap
createApplicationDispatcherBootstrap(
- final int noOfJobs,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor)
- throws FlinkException {
- return createApplicationDispatcherBootstrap(
- noOfJobs, dispatcherGateway, scheduledExecutor, exception ->
{});
- }
-
- private ApplicationDispatcherBootstrap
createApplicationDispatcherBootstrap(
- final int noOfJobs,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor,
- final FatalErrorHandler errorHandler)
- throws FlinkException {
- return createApplicationDispatcherBootstrap(
- noOfJobs, getConfiguration(), dispatcherGateway,
scheduledExecutor, errorHandler);
- }
-
- private ApplicationDispatcherBootstrap
createApplicationDispatcherBootstrap(
- final Configuration configuration,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor)
- throws FlinkException {
- return createApplicationDispatcherBootstrap(
- 1, configuration, dispatcherGateway, scheduledExecutor,
exception -> {});
- }
-
- private ApplicationDispatcherBootstrap
createApplicationDispatcherBootstrap(
- final int noOfJobs,
- final Configuration configuration,
- final DispatcherGateway dispatcherGateway,
- final ScheduledExecutor scheduledExecutor,
- final FatalErrorHandler errorHandler)
- throws FlinkException {
- final PackagedProgram program = getProgram(noOfJobs);
- return new ApplicationDispatcherBootstrap(
- program,
- Collections.emptyList(),
- configuration,
- dispatcherGateway,
- scheduledExecutor,
- errorHandler);
- }
-
- private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
- return MultiExecuteJob.getProgram(noOfJobs, true);
- }
-
- private static JobResult createFailedJobResult(final JobID jobId) {
- return createJobResult(jobId, JobStatus.FAILED);
- }
-
- private static JobResult createUnknownJobResult(final JobID jobId) {
- return createJobResult(jobId, null);
- }
-
- private static JobResult createJobResult(
- final JobID jobID, @Nullable final JobStatus jobStatus) {
- JobResult.Builder builder =
- new
JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
- if (jobStatus == JobStatus.CANCELED) {
- builder.serializedThrowable(
- new SerializedThrowable(new
JobCancellationException(jobID, "Hello", null)));
- } else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
- builder.serializedThrowable(
- new SerializedThrowable(new JobExecutionException(jobID,
"bla bla bla")));
- }
- return builder.build();
- }
-
- private static <T, E extends Throwable> E assertException(
- CompletableFuture<T> future, Class<E> exceptionClass) throws
Exception {
-
- try {
- future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- } catch (Throwable e) {
- Optional<E> maybeException = ExceptionUtils.findThrowable(e,
exceptionClass);
- if (!maybeException.isPresent()) {
- throw e;
- }
- return maybeException.get();
- }
- throw new Exception(
- "Future should have completed exceptionally with "
- + exceptionClass.getCanonicalName()
- + ".");
- }
-
- private Configuration getConfiguration() {
- final Configuration configuration = new Configuration();
- configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
- return configuration;
- }
-}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
new file mode 100644
index 00000000000..958850c9d41
--- /dev/null
+++
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationJobUtilsTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ApplicationOptionsInternal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link ApplicationJobUtils}. */
+public class ApplicationJobUtilsTest {
+
+ private static final String TEST_HA_CLUSTER_ID = "cluster";
+ private static final String TEST_APPLICATION_ID =
"ca0eb040022fbccd4cf05d1e274ae25e";
+ private static final String TEST_JOB_ID =
"e79b6d171acd4baa6f421e3631168810";
+
+ private Configuration configuration;
+
+ @BeforeEach
+ void setUp() {
+ configuration = new Configuration();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideParametersForMaybeFixIds")
+ void testMaybeFixIds(
+ boolean isHAEnabled,
+ boolean isHaClusterIdSet,
+ boolean isApplicationIdSet,
+ boolean isJobIdSet,
+ @Nullable String expectedApplicationId,
+ @Nullable String expectedJobId) {
+ if (isHAEnabled) {
+ configuration.set(
+ HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ }
+ if (isHaClusterIdSet) {
+ configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID,
TEST_HA_CLUSTER_ID);
+ }
+ if (isApplicationIdSet) {
+ configuration.set(ApplicationOptionsInternal.FIXED_APPLICATION_ID,
TEST_APPLICATION_ID);
+ }
+ if (isJobIdSet) {
+ configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
TEST_JOB_ID);
+ }
+
+ ApplicationJobUtils.maybeFixIds(configuration);
+
+ assertEquals(
+ expectedApplicationId,
+
configuration.get(ApplicationOptionsInternal.FIXED_APPLICATION_ID));
+
+ assertEquals(
+ expectedJobId,
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+ }
+
+ private static Stream<Arguments> provideParametersForMaybeFixIds() {
+ // all combinations for the four input: (isHAEnabled,
isHaClusterIdSet, isApplicationIdSet,
+ // isJobIdSet)
+ return Stream.of(
+ Arguments.of(false, false, false, false, null, null),
+ Arguments.of(false, false, false, true, null, TEST_JOB_ID),
+ Arguments.of(false, false, true, false, TEST_APPLICATION_ID,
null),
+ Arguments.of(false, false, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
+ Arguments.of(false, true, false, false, null, null),
+ Arguments.of(false, true, false, true, null, TEST_JOB_ID),
+ Arguments.of(false, true, true, false, TEST_APPLICATION_ID,
null),
+ Arguments.of(false, true, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
+ Arguments.of(
+ true,
+ false,
+ false,
+ false,
+ getAbstractIdFromString(
+
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+ getAbstractIdFromString(
+
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue())),
+ Arguments.of(
+ true,
+ false,
+ false,
+ true,
+ getAbstractIdFromString(
+
HighAvailabilityOptions.HA_CLUSTER_ID.defaultValue()),
+ TEST_JOB_ID),
+ Arguments.of(true, false, true, false, TEST_APPLICATION_ID,
TEST_APPLICATION_ID),
+ Arguments.of(true, false, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID),
+ Arguments.of(
+ true,
+ true,
+ false,
+ false,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID)),
+ Arguments.of(
+ true,
+ true,
+ false,
+ true,
+ getAbstractIdFromString(TEST_HA_CLUSTER_ID),
+ TEST_JOB_ID),
+ Arguments.of(true, true, true, false, TEST_APPLICATION_ID,
TEST_APPLICATION_ID),
+ Arguments.of(true, true, true, true, TEST_APPLICATION_ID,
TEST_JOB_ID));
+ }
+
+ private static String getAbstractIdFromString(String str) {
+ return (new AbstractID(str.hashCode(), 0)).toHexString();
+ }
+
+ @Test
+ void testAllowExecuteMultipleJobs_HADisabled_NoFixedJobId() {
+ assertEquals(
+ HighAvailabilityMode.NONE.name(),
+ configuration.get(HighAvailabilityOptions.HA_MODE));
+
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+
+
assertTrue(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+ }
+
+ @Test
+ void testAllowExecuteMultipleJobs_HAEnabled_NoFixedJobId() {
+ final String clusterId = "cluster";
+ configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+
assertNull(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+
+
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+ }
+
+ @Test
+ void testAllowExecuteMultipleJobs_HAEnabled_FixedJobIdSet() {
+ final String clusterId = "cluster";
+ final JobID testJobID = new JobID(0, 2);
+ configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+ configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
+
+
assertFalse(ApplicationJobUtils.allowExecuteMultipleJobs(configuration));
+ }
+}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
similarity index 98%
rename from
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
rename to
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
index b5fb78bf94d..03570d12095 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java
@@ -69,8 +69,8 @@ import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
-/** Integration tests related to {@link ApplicationDispatcherBootstrap}. */
-class ApplicationDispatcherBootstrapITCase {
+/** Integration tests related to {@link PackagedProgramApplication}. */
+class PackagedProgramApplicationITCase {
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
@@ -238,8 +238,7 @@ class ApplicationDispatcherBootstrapITCase {
final ArchivedExecutionGraph graph =
cluster.getArchivedExecutionGraph(jobId).get();
assertThat(graph.getJobID()).isEqualTo(jobId);
- assertThat(graph.getJobName())
- .isEqualTo(ApplicationDispatcherBootstrap.FAILED_JOB_NAME);
+
assertThat(graph.getJobName()).isEqualTo(PackagedProgramApplication.FAILED_JOB_NAME);
assertThat(graph.getFailureInfo())
.isNotNull()
.extracting(ErrorInfo::getException)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
b/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
new file mode 100644
index 00000000000..7191a21e683
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ApplicationOptionsInternal.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Application options that are not meant to be used by the user. */
+public class ApplicationOptionsInternal {
+ public static final ConfigOption<String> FIXED_APPLICATION_ID =
+ key("$internal.application.id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "**DO NOT USE** The static ApplicationId to be
used for the application. "
+ + "For fault-tolerance, this value needs
to stay the same across runs.");
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
new file mode 100644
index 00000000000..f5b0ecc9dd4
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateApplicationSubmissionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.util.FlinkException;
+
+public class DuplicateApplicationSubmissionException extends FlinkException {
+
+ private static final long serialVersionUID = 2818087325120827524L;
+
+ private final ApplicationID applicationId;
+
+ public DuplicateApplicationSubmissionException(ApplicationID
applicationId) {
+ super("Application has already been submitted.");
+ this.applicationId = applicationId;
+ }
+
+ public ApplicationID getApplicationId() {
+ return applicationId;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
new file mode 100644
index 00000000000..f781dedff1e
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ApplicationBootstrap.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.application.AbstractApplication;
+
+/**
+ * A {@link DispatcherBootstrap} which wraps an {@link AbstractApplication}
for execution upon
+ * dispatcher initialization.
+ */
+public class ApplicationBootstrap implements DispatcherBootstrap {
+ private final AbstractApplication application;
+
+ public ApplicationBootstrap(AbstractApplication application) {
+ this.application = application;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ application.cancel();
+ }
+
+ public AbstractApplication getApplication() {
+ return application;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 63e10ec5cb7..596167a0d8e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,10 +37,12 @@ import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.client.DuplicateApplicationSubmissionException;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -133,6 +136,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* Base class for the Dispatcher component. The Dispatcher component is
responsible for receiving
@@ -212,6 +216,10 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
*/
private final Set<JobID> pendingJobResourceRequirementsUpdates = new
HashSet<>();
+ private final Map<ApplicationID, AbstractApplication> applications = new
HashMap<>();
+
+ private final Map<ApplicationID, Set<JobID>> recoveredApplicationJobIds =
new HashMap<>();
+
/** Enum to distinguish between initial job submission and re-submission
for recovery. */
protected enum ExecutionType {
SUBMISSION,
@@ -366,6 +374,10 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
+
+ if (dispatcherBootstrap instanceof ApplicationBootstrap) {
+ submitApplication(((ApplicationBootstrap)
dispatcherBootstrap).getApplication()).get();
+ }
}
private void startDispatcherServices() throws Exception {
@@ -410,6 +422,13 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private void runRecoveredJob(final ExecutionPlan recoveredJob) {
checkNotNull(recoveredJob);
+ if (recoveredJob.getApplicationId().isPresent()) {
+ recoveredApplicationJobIds
+ .computeIfAbsent(
+ recoveredJob.getApplicationId().get(), ignored ->
new HashSet<>())
+ .add(recoveredJob.getJobID());
+ }
+
initJobClientExpiredTime(recoveredJob);
try (MdcCloseable ignored =
@@ -497,6 +516,10 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
() -> {
dispatcherBootstrap.stop();
+ for (AbstractApplication application :
applications.values()) {
+ application.dispose();
+ }
+
stopDispatcherServices();
log.info("Stopped dispatcher {}.", getAddress());
@@ -579,6 +602,34 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
return archiveExecutionGraphToHistoryServer(executionGraphInfo);
}
+ /** This method must be called from the main thread. */
+ private CompletableFuture<Acknowledge>
submitApplication(AbstractApplication application) {
+ final ApplicationID applicationId = application.getApplicationId();
+ log.info(
+ "Received application submission '{}' ({}).",
application.getName(), applicationId);
+
+ if (applications.containsKey(applicationId)) {
+ log.warn("Application with id {} already exists.", applicationId);
+ throw new CompletionException(
+ new
DuplicateApplicationSubmissionException(applicationId));
+ }
+ applications.put(applicationId, application);
+ Set<JobID> jobs = recoveredApplicationJobIds.remove(applicationId);
+ if (jobs != null) {
+ jobs.forEach(application::addJob);
+ }
+ return application.execute(
+ getSelfGateway(DispatcherGateway.class),
+ getRpcService().getScheduledExecutor(),
+ getMainThreadExecutor(),
+ this::onFatalError);
+ }
+
+ @VisibleForTesting
+ Map<ApplicationID, AbstractApplication> getApplications() {
+ return applications;
+ }
+
/**
* Checks whether the given job has already been executed.
*
@@ -595,7 +646,28 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
applyParallelismOverrides((JobGraph) executionPlan);
}
- log.info("Submitting job '{}' ({}).", executionPlan.getName(),
executionPlan.getJobID());
+ final JobID jobId = executionPlan.getJobID();
+ final String jobName = executionPlan.getName();
+
+ if (executionPlan.getApplicationId().isPresent()) {
+ ApplicationID applicationId =
executionPlan.getApplicationId().get();
+ log.info(
+ "Submitting job '{}' ({}) with associated application
({}).",
+ jobName,
+ jobId,
+ applicationId);
+ checkState(
+ applications.containsKey(applicationId),
+ "Application %s not found.",
+ applicationId);
+ applications.get(applicationId).addJob(jobId);
+ } else {
+ // TODO update the message after SingleJobApplication is
implemented
+ // This can occur in two cases:
+ // 1. CLI/REST submissions of jobs without an application
+ // 2. Tests for submitJob that submit jobs without an application
+ log.info("Submitting job '{}' ({}) without associated
application.", jobName, jobId);
+ }
// track as an outstanding job
submittedAndWaitingTerminationJobIDs.add(executionPlan.getJobID());
@@ -649,7 +721,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
private JobManagerRunner createJobMasterRunner(ExecutionPlan
executionPlan) throws Exception {
-
Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
+
checkState(!jobManagerRunnerRegistry.isRegistered(executionPlan.getJobID()));
return jobManagerRunnerFactory.createJobManagerRunner(
executionPlan,
configuration,
@@ -664,7 +736,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
private JobManagerRunner createJobCleanupRunner(JobResult dirtyJobResult)
throws Exception {
-
Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(dirtyJobResult.getJobId()));
+
checkState(!jobManagerRunnerRegistry.isRegistered(dirtyJobResult.getJobId()));
return cleanupRunnerFactory.create(
dirtyJobResult,
highAvailabilityServices.getCheckpointRecoveryFactory(),
@@ -684,7 +756,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
.getResultFuture()
.handleAsync(
(jobManagerRunnerResult, throwable) -> {
- Preconditions.checkState(
+ checkState(
jobManagerRunnerRegistry.isRegistered(jobId)
&&
jobManagerRunnerRegistry.get(jobId)
==
jobManagerRunner,
@@ -1277,7 +1349,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
@VisibleForTesting
void registerJobManagerRunnerTerminationFuture(
JobID jobId, CompletableFuture<Void>
jobManagerRunnerTerminationFuture) {
-
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
+ checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
jobManagerRunnerTerminationFutures.put(jobId,
jobManagerRunnerTerminationFuture);
// clean up the pending termination future
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 551168ea4ef..3506526d1df 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobgraph;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
@@ -49,6 +50,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -83,6 +85,9 @@ public class JobGraph implements ExecutionPlan {
/** ID of this job. May be set if specific job id is desired (e.g. session
management) */
private JobID jobID;
+ /** ID of the application this job belongs to. */
+ @Nullable private ApplicationID applicationId;
+
/** Name of this job. */
private final String jobName;
@@ -134,20 +139,21 @@ public class JobGraph implements ExecutionPlan {
* @param jobName The name of the job.
*/
public JobGraph(String jobName) {
- this(null, jobName);
+ this(null, null, jobName);
}
/**
* Constructs a new job graph with the given job ID (or a random ID, if
{@code null} is passed),
- * the given name and the given execution configuration (see {@link
ExecutionConfig}). The
- * ExecutionConfig will be serialized and can't be modified afterwards.
+ * the given application ID, the given name and the given execution
configuration (see {@link
+ * ExecutionConfig}). The ExecutionConfig will be serialized and can't be
modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code
null} is passed.
* @param jobName The name of the job.
*/
- public JobGraph(@Nullable JobID jobId, String jobName) {
+ public JobGraph(@Nullable JobID jobId, @Nullable ApplicationID
applicationId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+ this.applicationId = applicationId;
try {
setExecutionConfig(new ExecutionConfig());
@@ -167,7 +173,7 @@ public class JobGraph implements ExecutionPlan {
* @param vertices The vertices to add to the graph.
*/
public JobGraph(@Nullable JobID jobId, String jobName, JobVertex...
vertices) {
- this(jobId, jobName);
+ this(jobId, null, jobName);
for (JobVertex vertex : vertices) {
addVertex(vertex);
@@ -191,6 +197,21 @@ public class JobGraph implements ExecutionPlan {
this.jobID = jobID;
}
+ /**
+ * Returns the ID of the application this job belongs to.
+ *
+ * @return the ID of the application
+ */
+ @Override
+ public Optional<ApplicationID> getApplicationId() {
+ return Optional.ofNullable(applicationId);
+ }
+
+ /** Sets the ID of the application. */
+ public void setApplicationId(ApplicationID applicationId) {
+ this.applicationId = checkNotNull(applicationId);
+ }
+
/**
* Returns the name assigned to the job graph.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
index 7e91e123ca1..05d430ff8de 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
@@ -188,7 +188,11 @@ public class AdaptiveGraphManager
userClassloader,
this);
- this.jobGraph = createAndInitializeJobGraph(streamGraph,
streamGraph.getJobID());
+ this.jobGraph =
+ createAndInitializeJobGraph(
+ streamGraph,
+ streamGraph.getJobID(),
+ streamGraph.getApplicationId().orElse(null));
this.defaultSlotSharingGroup = new SlotSharingGroup();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
index 8b4216fae18..48e34f049a5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/ExecutionPlan.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
@@ -35,6 +36,7 @@ import java.io.Serializable;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* An interface representing a general execution plan, which can be
implemented by different types
@@ -199,4 +201,11 @@ public interface ExecutionPlan extends Serializable {
* @return The serialized execution configuration object
*/
SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
+
+ /**
+ * Gets the unique identifier of the application this job belongs to.
+ *
+ * @return the application id, or empty if not associated with an
application
+ */
+ Optional<ApplicationID> getApplicationId();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a64d534ba93..349bf0a16cf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
@@ -134,6 +135,9 @@ public class StreamGraph implements Pipeline, ExecutionPlan
{
private JobID jobId;
+ /** ID of the application this job belongs to. */
+ @Nullable private ApplicationID applicationId;
+
private final Configuration jobConfiguration;
private transient ExecutionConfig executionConfig;
private final CheckpointConfig checkpointConfig;
@@ -1160,16 +1164,28 @@ public class StreamGraph implements Pipeline,
ExecutionPlan {
/** Gets the assembled {@link JobGraph} with a random {@link JobID}. */
@VisibleForTesting
public JobGraph getJobGraph() {
- return getJobGraph(Thread.currentThread().getContextClassLoader(),
jobId);
+ return getJobGraph(Thread.currentThread().getContextClassLoader(),
jobId, applicationId);
}
public JobGraph getJobGraph(ClassLoader userClassLoader) {
- return getJobGraph(userClassLoader, jobId);
+ return getJobGraph(userClassLoader, jobId, applicationId);
}
/** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */
- public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID
jobID) {
- return StreamingJobGraphGenerator.createJobGraph(userClassLoader,
this, jobID);
+ public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID
jobId) {
+ return getJobGraph(userClassLoader, jobId, applicationId);
+ }
+
+ /**
+ * Gets the assembled {@link JobGraph} with a specified {@link JobID} and
a specified {@link
+ * ApplicationID}.
+ */
+ public JobGraph getJobGraph(
+ ClassLoader userClassLoader,
+ @Nullable JobID jobId,
+ @Nullable ApplicationID applicationId) {
+ return StreamingJobGraphGenerator.createJobGraph(
+ userClassLoader, this, jobId, applicationId);
}
public String getStreamingPlanAsJSON() {
@@ -1266,6 +1282,15 @@ public class StreamGraph implements Pipeline,
ExecutionPlan {
return jobId;
}
+ public void setApplicationId(ApplicationID applicationId) {
+ this.applicationId = checkNotNull(applicationId);
+ }
+
+ @Override
+ public Optional<ApplicationID> getApplicationId() {
+ return Optional.ofNullable(applicationId);
+ }
+
/**
* Sets the classpath required to run the job on a task manager.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ae948dc3852..14ee3fdffc1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -134,12 +135,16 @@ public class StreamingJobGraphGenerator {
Thread.currentThread().getContextClassLoader(),
streamGraph,
streamGraph.getJobID(),
+ streamGraph.getApplicationId().orElse(null),
Runnable::run)
.createJobGraph();
}
public static JobGraph createJobGraph(
- ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable
JobID jobID) {
+ ClassLoader userClassLoader,
+ StreamGraph streamGraph,
+ @Nullable JobID jobId,
+ @Nullable ApplicationID applicationId) {
// TODO Currently, we construct a new thread pool for the compilation
of each job. In the
// future, we may refactor the job submission framework and make it
reusable across jobs.
final ExecutorService serializationExecutor =
@@ -152,7 +157,11 @@ public class StreamingJobGraphGenerator {
new
ExecutorThreadFactory("flink-operator-serialization-io"));
try {
return new StreamingJobGraphGenerator(
- userClassLoader, streamGraph, jobID,
serializationExecutor)
+ userClassLoader,
+ streamGraph,
+ jobId,
+ applicationId,
+ serializationExecutor)
.createJobGraph();
} finally {
serializationExecutor.shutdown();
@@ -177,14 +186,15 @@ public class StreamingJobGraphGenerator {
private StreamingJobGraphGenerator(
ClassLoader userClassloader,
StreamGraph streamGraph,
- @Nullable JobID jobID,
+ @Nullable JobID jobId,
+ @Nullable ApplicationID applicationId,
Executor serializationExecutor) {
this.userClassloader = userClassloader;
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new
StreamGraphUserHashHasher());
this.serializationExecutor =
Preconditions.checkNotNull(serializationExecutor);
- jobGraph = createAndInitializeJobGraph(streamGraph, jobID);
+ jobGraph = createAndInitializeJobGraph(streamGraph, jobId,
applicationId);
// Generate deterministic hashes for the nodes in order to identify
them across
// submission iff they didn't change.
@@ -886,8 +896,8 @@ public class StreamingJobGraphGenerator {
}
public static JobGraph createAndInitializeJobGraph(
- StreamGraph streamGraph, @Nullable JobID jobId) {
- JobGraph jobGraph = new JobGraph(jobId, streamGraph.getJobName());
+ StreamGraph streamGraph, @Nullable JobID jobId, @Nullable
ApplicationID applicationId) {
+ JobGraph jobGraph = new JobGraph(jobId, applicationId,
streamGraph.getJobName());
jobGraph.setJobType(streamGraph.getJobType());
jobGraph.setDynamic(streamGraph.isDynamic());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ab25cf02dd0..4e53397a46a 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
@@ -28,6 +29,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -101,6 +103,7 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -137,6 +140,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -935,6 +939,43 @@ public class DispatcherTest extends AbstractDispatcherTest
{
.isZero();
}
+ @Test
+ public void testApplicationBootstrap() throws Exception {
+ final OneShotLatch bootstrapLatch = new OneShotLatch();
+ final ApplicationID applicationId = new ApplicationID();
+ final AbstractApplication application =
+ new TestingApplication(
+ applicationId,
+ (ignored -> {
+ bootstrapLatch.trigger();
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ }));
+ dispatcher =
+ createTestingDispatcherBuilder()
+ .setDispatcherBootstrapFactory(
+ (ignoredDispatcherGateway,
+ ignoredScheduledExecutor,
+ ignoredFatalErrorHandler) ->
+ new ApplicationBootstrap(application))
+ .build(rpcService);
+
+ dispatcher.start();
+
+ // ensure that the application execution is triggered
+ bootstrapLatch.await();
+
+ assertThat(dispatcher.getApplications().size()).isEqualTo(1);
+
assertThat(dispatcher.getApplications().keySet()).contains(applicationId);
+
+ jobGraph.setApplicationId(applicationId);
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ assertThat(application.getJobs().size()).isEqualTo(1);
+ assertThat(application.getJobs()).contains(jobGraph.getJobID());
+ }
+
@Test
public void testPersistedJobGraphWhenDispatcherIsShutDown() throws
Exception {
final TestingExecutionPlanStore submittedExecutionPlanStore =
@@ -2104,4 +2145,57 @@ public class DispatcherTest extends
AbstractDispatcherTest {
return runner;
}
}
+
+ private static class TestingApplication extends AbstractApplication {
+ private final Function<ExecuteParams, CompletableFuture<Acknowledge>>
executeFunction;
+
+ public TestingApplication(
+ ApplicationID applicationId,
+ Function<ExecuteParams, CompletableFuture<Acknowledge>>
executeFunction) {
+ super(applicationId);
+ this.executeFunction = executeFunction;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> execute(
+ DispatcherGateway dispatcherGateway,
+ ScheduledExecutor scheduledExecutor,
+ Executor mainThreadExecutor,
+ FatalErrorHandler errorHandler) {
+
+ ExecuteParams params =
+ new ExecuteParams(
+ dispatcherGateway, scheduledExecutor,
mainThreadExecutor, errorHandler);
+ return executeFunction.apply(params);
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void dispose() {}
+
+ @Override
+ public String getName() {
+ return "TestingApplication";
+ }
+
+ public static class ExecuteParams {
+ public final DispatcherGateway dispatcherGateway;
+ public final ScheduledExecutor scheduledExecutor;
+ public final Executor mainThreadExecutor;
+ public final FatalErrorHandler errorHandler;
+
+ public ExecuteParams(
+ DispatcherGateway dispatcherGateway,
+ ScheduledExecutor scheduledExecutor,
+ Executor mainThreadExecutor,
+ FatalErrorHandler errorHandler) {
+ this.dispatcherGateway = dispatcherGateway;
+ this.scheduledExecutor = scheduledExecutor;
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.errorHandler = errorHandler;
+ }
+ }
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
index 0103c6ec81f..4fe639c6c0d 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
@@ -84,7 +84,8 @@ public class ScriptExecutorITCase extends
AbstractSqlGatewayStatementITCaseBase
miniCluster.getConfiguration(),
ScriptExecutor.class.getClassLoader(),
false,
- false);
+ false,
+ null);
executor =
new TestScriptExecutor(