This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4cb44c0269918e5929fe448bfc8435765810de45 Author: Yi Zhang <[email protected]> AuthorDate: Wed Sep 24 14:54:46 2025 +0800 [FLINK-38758][runtime] Introduce PackagedProgramApplication --- .../generated/deployment_configuration.html | 6 + .../application/PackagedProgramApplication.java | 638 ++++++++++ .../flink/client/program/PackagedProgram.java | 16 + .../client/program/PackagedProgramDescriptor.java | 101 ++ .../PackagedProgramApplicationTest.java | 1344 ++++++++++++++++++++ .../program/PackagedProgramDescriptorTest.java | 71 ++ .../apache/flink/api/common/ApplicationState.java | 27 + .../flink/configuration/DeploymentOptions.java | 9 + 8 files changed, 2212 insertions(+) diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html index bfae1eb1866..e6790e81506 100644 --- a/docs/layouts/shortcodes/generated/deployment_configuration.html +++ b/docs/layouts/shortcodes/generated/deployment_configuration.html @@ -62,5 +62,11 @@ <td>String</td> <td>The deployment target for the execution. This can take one of the following values when calling <code class="highlighter-rouge">bin/flink run</code>:<ul><li>remote</li><li>local</li><li>yarn-application</li><li>yarn-session</li><li>kubernetes-application</li><li>kubernetes-session</li></ul></td> </tr> + <tr> + <td><h5>execution.terminate-application-on-any-job-terminated-exceptionally</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>When it is set to true, the application will complete exceptionally if any job fails or is canceled. When it is set to false, the application will finish after all jobs reach terminal states.</td> + </tr> </tbody> </table> diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java new file mode 100644 index 00000000000..5cc2f2cc047 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.cli.ClientOptions; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.core.execution.PipelineExecutorServiceLoader; +import org.apache.flink.runtime.application.AbstractApplication; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of {@link AbstractApplication} designed for executing the user's {@code + * main()}. + */ +public class PackagedProgramApplication extends AbstractApplication { + + @VisibleForTesting static final String FAILED_JOB_NAME = "(application driver)"; + + private static final Logger LOG = LoggerFactory.getLogger(PackagedProgramApplication.class); + + private final PackagedProgramDescriptor programDescriptor; + + private final Collection<JobID> recoveredJobIds; + + private final Configuration configuration; + + private final boolean handleFatalError; + + private final boolean enforceSingleJobExecution; + + private final boolean submitFailedJobOnApplicationError; + + private final boolean shutDownOnFinish; + + private transient PackagedProgram program; + + private transient CompletableFuture<Void> applicationCompletionFuture; + + private transient ScheduledFuture<?> applicationExecutionTask; + + private transient CompletableFuture<Acknowledge> finishApplicationFuture; + + private transient boolean isDisposing = false; + + public PackagedProgramApplication( + final ApplicationID applicationId, + final PackagedProgram program, + final Collection<JobID> recoveredJobIds, + final Configuration configuration, + final boolean handleFatalError, + final boolean enforceSingleJobExecution, + final boolean submitFailedJobOnApplicationError, + final boolean shutDownOnFinish) { + super(applicationId); + this.program = checkNotNull(program); + this.recoveredJobIds = checkNotNull(recoveredJobIds); + this.configuration = checkNotNull(configuration); + this.handleFatalError = handleFatalError; + this.enforceSingleJobExecution = enforceSingleJobExecution; + this.submitFailedJobOnApplicationError = submitFailedJobOnApplicationError; + this.shutDownOnFinish = shutDownOnFinish; + this.programDescriptor = program.getDescriptor(); + } + + @Override + public CompletableFuture<Acknowledge> execute( + DispatcherGateway dispatcherGateway, + ScheduledExecutor scheduledExecutor, + Executor mainThreadExecutor, + FatalErrorHandler errorHandler) { + transitionToRunning(); + + final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>(); + final Set<JobID> tolerateMissingResult = Collections.synchronizedSet(new HashSet<>()); + + // we need to hand in a future as return value because we need to get those JobIs out + // from the scheduled task that executes the user program + applicationExecutionTask = + scheduledExecutor.schedule( + () -> + runApplicationEntryPoint( + applicationExecutionFuture, + tolerateMissingResult, + dispatcherGateway, + scheduledExecutor), + 0L, + TimeUnit.MILLISECONDS); + + boolean decoupleApplicationStatusFromJobStatus = + !configuration.get(DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION); + + if (decoupleApplicationStatusFromJobStatus) { + // when the application status is decoupled from the job status, we don't need to wait + // for the job results + applicationCompletionFuture = applicationExecutionFuture.thenApply(ignored -> null); + finishApplicationFuture = + applicationCompletionFuture + .handleAsync( + (ignored, t) -> { + if (t == null) { + LOG.info( + "Application completed SUCCESSFULLY (decoupled from job results)"); + return finishAsSucceeded( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler); + } + + return onApplicationCanceledOrFailed( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler, + t); + }, + mainThreadExecutor) + .thenCompose(Function.identity()); + } else { + applicationCompletionFuture = + applicationExecutionFuture.thenCompose( + jobIds -> + waitForJobResults( + dispatcherGateway, + jobIds, + tolerateMissingResult, + scheduledExecutor)); + + finishApplicationFuture = + applicationCompletionFuture + .handleAsync( + (ignored, t) -> { + if (t == null) { + LOG.info("Application completed SUCCESSFULLY"); + transitionToFinished(); + return maybeShutdownCluster( + dispatcherGateway, ApplicationStatus.SUCCEEDED); + } + + final Optional<JobStatus> maybeJobStatus = + extractJobStatus(t); + if (maybeJobStatus.isPresent()) { + // the exception is caused by job execution results + ApplicationState applicationState = + ApplicationState.fromJobStatus( + maybeJobStatus.get()); + LOG.info("Application {}: ", applicationState, t); + if (applicationState == ApplicationState.CANCELED) { + transitionToCanceling(); + return finishAsCanceled( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler); + + } else { + transitionToFailing(); + return finishAsFailed( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler); + } + } + + return onApplicationCanceledOrFailed( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler, + t); + }, + mainThreadExecutor) + .thenCompose(Function.identity()); + } + + // In Application Mode, the handleFatalError flag is set to true, and uncaught exceptions + // are handled by the errorHandler to trigger failover. + // In Session Mode, the handleFatalError flag may be set to false, leaving exceptions + // unhandled. This behavior may change in the future. + FutureUtils.handleUncaughtException( + finishApplicationFuture, + (t, e) -> { + if (handleFatalError) { + errorHandler.onFatalError(e); + } + }); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void cancel() { + ApplicationState currentState = getApplicationStatus(); + if (currentState == ApplicationState.CREATED) { + // nothing to cancel + transitionToCanceling(); + transitionToCanceled(); + } else if (currentState == ApplicationState.RUNNING) { + transitionToCanceling(); + cancelFutures(); + } + } + + @Override + public void dispose() { + isDisposing = true; + cancelFutures(); + } + + private void cancelFutures() { + if (applicationExecutionTask != null) { + applicationExecutionTask.cancel(true); + } + + if (applicationCompletionFuture != null) { + // applicationCompletionFuture.handleAsync will not block here + applicationCompletionFuture.cancel(true); + } + } + + @Override + public String getName() { + return programDescriptor.getMainClassName(); + } + + @VisibleForTesting + ScheduledFuture<?> getApplicationExecutionFuture() { + return applicationExecutionTask; + } + + @VisibleForTesting + CompletableFuture<Void> getApplicationCompletionFuture() { + return applicationCompletionFuture; + } + + @VisibleForTesting + CompletableFuture<Acknowledge> getFinishApplicationFuture() { + return finishApplicationFuture; + } + + private CompletableFuture<Acknowledge> onApplicationCanceledOrFailed( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler, + final Throwable t) { + if (t instanceof CancellationException) { + // the applicationCompletionFuture is canceled by cancel() or dispose() + if (isDisposing) { + LOG.warn("Application execution is canceled because the cluster is shutting down."); + // we don't need to do anything here during cleanup + return CompletableFuture.completedFuture(Acknowledge.get()); + } + LOG.info("Application execution is canceled manually."); + return finishAsCanceled( + dispatcherGateway, scheduledExecutor, mainThreadExecutor, errorHandler); + } + + LOG.warn("Application failed unexpectedly: ", t); + transitionToFailing(); + return finishAsFailed( + dispatcherGateway, scheduledExecutor, mainThreadExecutor, errorHandler); + } + + private CompletableFuture<Acknowledge> finishAsCanceled( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler) { + return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, scheduledExecutor) + .handleAsync( + (v, t) -> { + if (t != null) { + return onCancelAllJobsAndWaitForTerminalStatesException( + errorHandler, t); + } + transitionToCanceled(); + return maybeShutdownCluster( + dispatcherGateway, ApplicationStatus.CANCELED); + }, + mainThreadExecutor) + .thenCompose(Function.identity()); + } + + private CompletableFuture<Acknowledge> finishAsFailed( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler) { + return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, scheduledExecutor) + .handleAsync( + (v, t) -> { + if (t != null) { + return onCancelAllJobsAndWaitForTerminalStatesException( + errorHandler, t); + } + transitionToFailed(); + return maybeShutdownCluster( + dispatcherGateway, ApplicationStatus.FAILED); + }, + mainThreadExecutor) + .thenCompose(Function.identity()); + } + + private CompletableFuture<Acknowledge> finishAsSucceeded( + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final Executor mainThreadExecutor, + final FatalErrorHandler errorHandler) { + return cancelAllJobsAndWaitForTerminalStates(dispatcherGateway, scheduledExecutor) + .handleAsync( + (v, t) -> { + if (t != null) { + return onCancelAllJobsAndWaitForTerminalStatesException( + errorHandler, t); + } + transitionToFinished(); + return maybeShutdownCluster( + dispatcherGateway, ApplicationStatus.SUCCEEDED); + }, + mainThreadExecutor) + .thenCompose(Function.identity()); + } + + private CompletableFuture<Acknowledge> onCancelAllJobsAndWaitForTerminalStatesException( + final FatalErrorHandler errorHandler, final Throwable t) { + LOG.warn("Failed to cancel and wait for all jobs.", t); + errorHandler.onFatalError(t); + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + /** + * This method cancels the jobs to clean up related resources when the application ends. If any + * job cancellation fails, the returned future will complete exceptionally. + */ + private CompletableFuture<Void> cancelAllJobsAndWaitForTerminalStates( + final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) { + final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); + + LOG.info( + "Start to cancel remaining jobs for application {} ({}).", + getName(), + getApplicationId()); + // get all jobs that needs to be canceled + CompletableFuture<Set<JobID>> jobsToCancelFuture = + FutureUtils.combineAll( + getJobs().stream() + .map( + jobId -> + getNonGlobalTerminalJob( + dispatcherGateway, jobId, timeout)) + .collect(Collectors.toList())) + .thenApply( + jobIds -> + jobIds.stream() + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + + // get all jobs that is canceling + CompletableFuture<Set<JobID>> jobsCancelingFuture = + jobsToCancelFuture.thenCompose( + jobsToCancel -> { + if (!jobsToCancel.isEmpty()) { + LOG.info( + "Canceling jobs for application '{}' ({}): {}", + getName(), + getApplicationId(), + jobsToCancel); + } + + List<CompletableFuture<JobID>> cancelFutures = + jobsToCancel.stream() + .map( + jobId -> + cancelJob( + dispatcherGateway, + jobId, + timeout)) + .collect(Collectors.toList()); + + return FutureUtils.combineAll(cancelFutures) + .thenApply( + jobIds -> + jobIds.stream() + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + }); + + // wait for all jobs to be canceled in the scheduledExecutor + return jobsCancelingFuture.thenComposeAsync( + jobsCanceling -> { + List<CompletableFuture<?>> jobResultFutures = + jobsCanceling.stream() + .map( + jobId -> + getJobResult( + dispatcherGateway, + jobId, + scheduledExecutor, + false) + .thenApply(ignored -> null)) + .collect(Collectors.toList()); + return FutureUtils.waitForAll(jobResultFutures); + }, + scheduledExecutor); + } + + private CompletableFuture<JobID> getNonGlobalTerminalJob( + DispatcherGateway dispatcherGateway, JobID jobId, Duration timeout) { + return dispatcherGateway + .requestJobStatus(jobId, timeout) + .thenApply( + jobStatus -> { + if (!jobStatus.isGloballyTerminalState()) { + return jobId; + } else { + return null; + } + }); + } + + private CompletableFuture<JobID> cancelJob( + DispatcherGateway dispatcherGateway, JobID jobId, Duration timeout) { + return dispatcherGateway.cancelJob(jobId, timeout).thenApply(ignored -> jobId); + } + + private CompletableFuture<Acknowledge> maybeShutdownCluster( + DispatcherGateway dispatcherGateway, ApplicationStatus applicationStatus) { + return shutDownOnFinish + ? dispatcherGateway.shutDownCluster(applicationStatus) + : CompletableFuture.completedFuture(Acknowledge.get()); + } + + private Optional<JobStatus> extractJobStatus(Throwable t) { + final Optional<UnsuccessfulExecutionException> maybeException = + ExceptionUtils.findThrowable(t, UnsuccessfulExecutionException.class); + return maybeException.flatMap(UnsuccessfulExecutionException::getStatus); + } + + /** + * Runs the user program entrypoint and completes the given {@code jobIdsFuture} with the {@link + * JobID JobIDs} of the submitted jobs. + * + * <p>This should be executed in a separate thread (or task). + */ + private void runApplicationEntryPoint( + final CompletableFuture<List<JobID>> jobIdsFuture, + final Set<JobID> tolerateMissingResult, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor) { + if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) { + jobIdsFuture.completeExceptionally( + new ApplicationExecutionException( + String.format( + "Submission of failed job in case of an application error ('%s') is not supported in non-HA setups.", + DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR + .key()))); + return; + } + final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds); + try { + if (program == null) { + LOG.info("Reconstructing program from descriptor {}", programDescriptor); + program = programDescriptor.toPackageProgram(); + } + + final PipelineExecutorServiceLoader executorServiceLoader = + new EmbeddedExecutorServiceLoader( + applicationJobIds, dispatcherGateway, scheduledExecutor); + + ClientUtils.executeProgram( + executorServiceLoader, + configuration, + program, + enforceSingleJobExecution, + true /* suppress sysout */); + + if (applicationJobIds.isEmpty()) { + jobIdsFuture.completeExceptionally( + new ApplicationExecutionException( + "The application contains no execute() calls.")); + } else { + jobIdsFuture.complete(applicationJobIds); + } + } catch (Throwable t) { + // If we're running in a single job execution mode, it's safe to consider re-submission + // of an already finished a success. + final Optional<DuplicateJobSubmissionException> maybeDuplicate = + ExceptionUtils.findThrowable(t, DuplicateJobSubmissionException.class); + if (enforceSingleJobExecution + && maybeDuplicate.isPresent() + && maybeDuplicate.get().isGloballyTerminated()) { + final JobID jobId = maybeDuplicate.get().getJobID(); + tolerateMissingResult.add(jobId); + jobIdsFuture.complete(Collections.singletonList(jobId)); + } else if (submitFailedJobOnApplicationError && applicationJobIds.isEmpty()) { + final JobID failedJobId = + JobID.fromHexString( + configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); + dispatcherGateway + .submitFailedJob(failedJobId, FAILED_JOB_NAME, t) + .thenAccept( + ignored -> + jobIdsFuture.complete( + Collections.singletonList(failedJobId))); + } else { + jobIdsFuture.completeExceptionally( + new ApplicationExecutionException("Could not execute application.", t)); + } + } + } + + private CompletableFuture<Void> waitForJobResults( + final DispatcherGateway dispatcherGateway, + final Collection<JobID> applicationJobIds, + final Set<JobID> tolerateMissingResult, + final ScheduledExecutor executor) { + final List<CompletableFuture<?>> jobResultFutures = + applicationJobIds.stream() + .map( + jobId -> + unwrapJobResultException( + getJobResult( + dispatcherGateway, + jobId, + executor, + tolerateMissingResult.contains(jobId)))) + .collect(Collectors.toList()); + + return configuration.get(DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION) + ? FutureUtils.waitForAll(jobResultFutures) + : FutureUtils.completeAll(jobResultFutures); + } + + private CompletableFuture<JobResult> getJobResult( + final DispatcherGateway dispatcherGateway, + final JobID jobId, + final ScheduledExecutor scheduledExecutor, + final boolean tolerateMissingResult) { + final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); + final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD); + final CompletableFuture<JobResult> jobResultFuture = + JobStatusPollingUtils.getJobResult( + dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod); + if (tolerateMissingResult) { + // Return "unknown" job result if dispatcher no longer knows the actual result. + return FutureUtils.handleException( + jobResultFuture, + FlinkJobNotFoundException.class, + exception -> + new JobResult.Builder() + .jobId(jobId) + .jobStatus(null) + .netRuntime(Long.MAX_VALUE) + .build()); + } + return jobResultFuture; + } + + /** + * If the given {@link JobResult} indicates success, this passes through the {@link JobResult}. + * Otherwise, this returns a future that is finished exceptionally (potentially with an + * exception from the {@link JobResult}). + */ + private CompletableFuture<JobResult> unwrapJobResultException( + final CompletableFuture<JobResult> jobResult) { + return jobResult.thenApply( + result -> { + if (result.isSuccess()) { + return result; + } + + throw new CompletionException( + UnsuccessfulExecutionException.fromJobResult( + result, program.getUserCodeClassLoader())); + }); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index af1be294d07..0479cb7a544 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -98,6 +98,9 @@ public class PackagedProgram implements AutoCloseable { /** Flag indicating whether the job is a Python job. */ private final boolean isPython; + /** Serializable descriptor to reconstruct the PackagedProgram. */ + private final PackagedProgramDescriptor descriptor; + /** * Creates an instance that wraps the plan defined in the jar file using the given arguments. * For generating the plan the class defined in the className parameter is used. @@ -163,6 +166,19 @@ public class PackagedProgram implements AutoCloseable { throw new ProgramInvocationException( "The given program class does not have a main(String[]) method."); } + + this.descriptor = + new PackagedProgramDescriptor( + jarFile, + classpaths, + configuration, + savepointRestoreSettings, + args, + getMainClassName()); + } + + public PackagedProgramDescriptor getDescriptor() { + return descriptor; } public SavepointRestoreSettings getSavepointSettings() { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java new file mode 100644 index 00000000000..d36a16a8c69 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramDescriptor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.client.deployment.application.PackagedProgramApplication; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.Serializable; +import java.net.URL; +import java.util.Arrays; +import java.util.List; + +/** + * Descriptor for {@link PackagedProgram}. + * + * <p>This class provides a serializable representation of {@link PackagedProgram} that can be used + * to reconstruct the {@link PackagedProgram} after serialization and deserialization. It is mainly + * used by {@link PackagedProgramApplication}. + */ +public class PackagedProgramDescriptor implements Serializable { + + @Nullable private final File jarFile; + + private final List<URL> userClassPaths; + + private final Configuration configuration; + + private final SavepointRestoreSettings savepointRestoreSettings; + + private final String[] programArgs; + + private final String mainClassName; + + public PackagedProgramDescriptor( + @Nullable File jarFile, + List<URL> userClassPaths, + Configuration configuration, + SavepointRestoreSettings savepointRestoreSettings, + String[] programArgs, + String mainClassName) { + this.jarFile = jarFile; + this.userClassPaths = userClassPaths; + this.configuration = configuration; + this.savepointRestoreSettings = savepointRestoreSettings; + this.programArgs = programArgs; + this.mainClassName = mainClassName; + } + + public String getMainClassName() { + return mainClassName; + } + + public PackagedProgram toPackageProgram() throws ProgramInvocationException { + return PackagedProgram.newBuilder() + .setJarFile(jarFile) + .setEntryPointClassName(mainClassName) + .setConfiguration(configuration) + .setUserClassPaths(userClassPaths) + .setArguments(programArgs) + .setSavepointRestoreSettings(savepointRestoreSettings) + .build(); + } + + @Override + public String toString() { + return "PackagedProgramDescriptor{" + + "jarFile=" + + jarFile + + ", userClassPaths=" + + userClassPaths + + ", configuration=" + + configuration + + ", savepointRestoreSettings=" + + savepointRestoreSettings + + ", programArgs=" + + Arrays.toString(programArgs) + + ", mainClassName=" + + mainClassName + + '}'; + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java new file mode 100644 index 00000000000..12affab4747 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java @@ -0,0 +1,1344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.testjar.FailingJob; +import org.apache.flink.client.testjar.MultiExecuteJob; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.runtime.client.DuplicateJobSubmissionException; +import org.apache.flink.runtime.client.JobCancellationException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Tests for the {@link PackagedProgramApplication}. */ +public class PackagedProgramApplicationTest { + + private static final int TIMEOUT_SECONDS = 10; + + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); + private final ScheduledExecutor scheduledExecutor = + new ScheduledExecutorServiceAdapter(executor); + private Executor mainThreadExecutor = Executors.newSingleThreadScheduledExecutor(); + + @AfterEach + void cleanup() { + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor); + } + + @Test + void testOnlyOneJobIsAllowedWhenEnforceSingleJobExecution() throws Throwable { + final PackagedProgramApplication application = + createAndExecuteApplication( + 2, getConfiguration(), finishedJobGatewayBuilder().build(), true); + + assertException(application.getApplicationCompletionFuture(), FlinkRuntimeException.class); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + } + + @Test + void testStaticJobId() throws Throwable { + final JobID testJobID = new JobID(0, 2); + + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set( + PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + + final CompletableFuture<JobID> submittedJobId = new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + finishedJobGatewayBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobId.complete(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(1, configurationUnderTest, dispatcherBuilder.build()); + + application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + + assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(new JobID(0L, 2L)); + } + + @Test + void testApplicationCleanupWhenOneJobFails() throws Throwable { + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + final ConcurrentLinkedDeque<JobID> canceledJobIds = new ConcurrentLinkedDeque<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture(JobStatus.FAILED); + } + if (canceledJobIds.contains(jobId)) { + return CompletableFuture.completedFuture( + JobStatus.CANCELED); + } + return CompletableFuture.completedFuture(JobStatus.RUNNING); + }) + .setRequestJobResultFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture( + createFailedJobResult(jobId)); + } + // the other jobs should be canceled + return CompletableFuture.completedFuture( + createCanceledJobResult(jobId)); + }) + .setCancelJobFunction( + jobId -> { + canceledJobIds.add(jobId); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + getProgram(2), + Collections.emptyList(), + getConfiguration(), + true, + false, + false, + true); + + // change mainThreadExecutor to be manually triggered + // so that the application finish process isn't completed before adding the jobs + mainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + final ManuallyTriggeredScheduledExecutor manuallyTriggeredMainThreadExecutor = + (ManuallyTriggeredScheduledExecutor) mainThreadExecutor; + + // wait for the application execution to be accepted + application + .execute( + dispatcherBuilder.build(), + scheduledExecutor, + mainThreadExecutor, + exception -> {}) + .join(); + + // Wait for the task that is used to finish the application. + while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) { + Thread.sleep(100); + } + submittedJobIds.forEach(application::addJob); + + // Triggers the process to finish the application after adding the jobs. This + // ensures that the application knows that these jobs need to be canceled. + manuallyTriggeredMainThreadExecutor.trigger(); + final UnsuccessfulExecutionException exception = + assertException( + application.getApplicationCompletionFuture(), + UnsuccessfulExecutionException.class); + assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED); + + // Wait for the task that is used to transition to the final state. + while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) { + Thread.sleep(100); + } + manuallyTriggeredMainThreadExecutor.trigger(); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(canceledJobIds).containsOnly(submittedJobIds.peekLast()); + assertApplicationFailed(application); + } + + @Test + void testErrorHandlerIsCalledWhenApplicationCleanupThrowsAnException() throws Throwable { + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture(JobStatus.FAILED); + } + // never finish the other jobs + return CompletableFuture.completedFuture(JobStatus.RUNNING); + }) + .setRequestJobResultFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture( + createFailedJobResult(jobId)); + } + // never finish the other jobs + return new CompletableFuture<>(); + }) + .setCancelJobFunction( + jobId -> + FutureUtils.completedExceptionally( + new FlinkRuntimeException("Test exception."))); + + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + getProgram(2), + Collections.emptyList(), + getConfiguration(), + true, + false, + false, + true); + + // change mainThreadExecutor to be manually triggered + // so that the application finish process isn't completed before adding the jobs + mainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + final ManuallyTriggeredScheduledExecutor manuallyTriggeredMainThreadExecutor = + (ManuallyTriggeredScheduledExecutor) mainThreadExecutor; + + // wait for the application execution to be accepted + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + application + .execute( + dispatcherBuilder.build(), + scheduledExecutor, + mainThreadExecutor, + errorHandlerFuture::completeExceptionally) + .join(); + + // Wait for the task that is used to finish the application. + while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) { + Thread.sleep(100); + } + submittedJobIds.forEach(application::addJob); + + // Triggers the process to finish the application after adding the jobs. This + // ensures that the application knows that these jobs need to be canceled. + manuallyTriggeredMainThreadExecutor.trigger(); + final UnsuccessfulExecutionException exception = + assertException( + application.getApplicationCompletionFuture(), + UnsuccessfulExecutionException.class); + assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED); + + // Wait for the task that is used to transition to the final state. + while (manuallyTriggeredMainThreadExecutor.numQueuedRunnables() < 1) { + Thread.sleep(100); + } + manuallyTriggeredMainThreadExecutor.trigger(); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailing(application); + + // we call the error handler + assertException(errorHandlerFuture, FlinkRuntimeException.class); + + // we didn't shut down the cluster + assertThat(shutdownCalled.get()).isFalse(); + } + + @Test + void testApplicationFailsAsSoonAsOneJobFails() throws Throwable { + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture(JobStatus.FAILED); + } + // never finish the other jobs + return CompletableFuture.completedFuture(JobStatus.RUNNING); + }) + .setRequestJobResultFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture( + createFailedJobResult(jobId)); + } + // never finish the other jobs + return new CompletableFuture<>(); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(2, dispatcherBuilder.build()); + final UnsuccessfulExecutionException exception = + assertException( + application.getApplicationCompletionFuture(), + UnsuccessfulExecutionException.class); + assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + } + + @Test + void testApplicationFinishesAfterAllJobsTerminate() throws Throwable { + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set( + DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION, false); + + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + + final CompletableFuture<ApplicationStatus> clusterShutdownStatus = + new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture(JobStatus.FAILED); + } + // finish the other jobs + return CompletableFuture.completedFuture(JobStatus.FINISHED); + }) + .setRequestJobResultFunction( + jobId -> { + // we only fail one of the jobs, the first one + if (jobId.equals(submittedJobIds.peek())) { + return CompletableFuture.completedFuture( + createFailedJobResult(jobId)); + } + // finish the other jobs + return CompletableFuture.completedFuture( + createJobResult(jobId, JobStatus.FINISHED)); + }) + .setClusterShutdownFunction( + status -> { + clusterShutdownStatus.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(2, configurationUnderTest, dispatcherBuilder.build()); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + + assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(ApplicationStatus.SUCCEEDED); + } + + @Test + void testApplicationSucceedsWhenAllJobsSucceed() throws Exception { + final PackagedProgramApplication application = + createAndExecuteApplication(3, finishedJobGatewayBuilder().build()); + + // this would block indefinitely if the applications don't finish + application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + } + + @Test + void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception { + final CompletableFuture<ApplicationStatus> clusterShutdownStatus = + new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + canceledJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + clusterShutdownStatus.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + // wait until the application "thinks" it's done, also makes sure that we don't + // fail the future exceptionally with a JobCancelledException + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationCanceled(application); + + assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(ApplicationStatus.CANCELED); + } + + @Test + void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception { + final TestingDispatcherGateway.Builder dispatcherBuilder = finishedJobGatewayBuilder(); + + PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + ScheduledFuture<?> applicationExecutionFuture = application.getApplicationExecutionFuture(); + + // wait until the application "thinks" it's done + completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + + // make sure the task finishes + applicationExecutionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + @Test + void testApplicationCancellationWhenNotRunning() throws Exception { + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + getProgram(1), + Collections.emptyList(), + getConfiguration(), + true, + false, + false, + true); + + application.cancel(); + + assertApplicationCanceled(application); + } + + @Test + void testApplicationIsCanceledWhenCancellingApplication() throws Exception { + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + final ConcurrentLinkedDeque<JobID> canceledJobIds = new ConcurrentLinkedDeque<>(); + + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + final TestingDispatcherGateway.Builder dispatcherBuilder = + runningJobGatewayBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction( + jobId -> { + if (canceledJobIds.contains(jobId)) { + return CompletableFuture.completedFuture( + JobStatus.CANCELED); + } + return CompletableFuture.completedFuture(JobStatus.RUNNING); + }) + .setRequestJobResultFunction( + jobId -> { + // all jobs should be canceled + return CompletableFuture.completedFuture( + createCanceledJobResult(jobId)); + }) + .setCancelJobFunction( + jobId -> { + canceledJobIds.add(jobId); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor = + new ManuallyTriggeredScheduledExecutor(); + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final PackagedProgramApplication application = + createAndExecuteApplication( + 3, + dispatcherBuilder.build(), + manuallyTriggeredExecutor, + errorHandlerFuture::completeExceptionally); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + ScheduledFuture<?> applicationExecutionFuture = application.getApplicationExecutionFuture(); + + CompletableFuture.runAsync( + () -> { + try { + application.cancel(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + mainThreadExecutor) + .join(); + + // Triggers the scheduled process after calling cancel. This + // ensures that the application task isn't completed before the cancel method is called + // which would prevent the cancel call from cancelling the task's future. + manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask(); + + // we didn't call the error handler + assertThat(errorHandlerFuture.isDone()).isFalse(); + + // Wait to trigger the task that is used to finish the application. . + while (manuallyTriggeredExecutor.numQueuedRunnables() < 1) { + Thread.sleep(100); + } + manuallyTriggeredExecutor.trigger(); + + // completion future gets completed normally + completionFuture.get(); + + // verify that we shut down the cluster + assertThat(shutdownCalled.get()).isTrue(); + + // verify that the application task is being cancelled + assertThat(applicationExecutionFuture.isCancelled()).isTrue(); + assertThat(applicationExecutionFuture.isDone()).isTrue(); + assertThat(canceledJobIds).containsExactlyInAnyOrderElementsOf(submittedJobIds); + assertApplicationCanceled(application); + } + + @Test + void testApplicationDispose() throws Exception { + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + final TestingDispatcherGateway.Builder dispatcherBuilder = + runningJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor = + new ManuallyTriggeredScheduledExecutor(); + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final PackagedProgramApplication application = + createAndExecuteApplication( + 3, + dispatcherBuilder.build(), + manuallyTriggeredExecutor, + errorHandlerFuture::completeExceptionally); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + ScheduledFuture<?> applicationExecutionFuture = application.getApplicationExecutionFuture(); + + application.dispose(); + + // Triggers the scheduled process after calling cancel. This + // ensures that the application task isn't completed before the cancel method is called + // which would prevent the cancel call from cancelling the task's future. + manuallyTriggeredExecutor.triggerNonPeriodicScheduledTask(); + + // we didn't call the error handler + assertThat(errorHandlerFuture.isDone()).isFalse(); + + // completion future gets completed normally + completionFuture.get(); + + // verify that we didn't shut down the cluster + assertThat(shutdownCalled.get()).isFalse(); + + // verify that the application task is being cancelled + assertThat(applicationExecutionFuture.isCancelled()).isTrue(); + assertThat(applicationExecutionFuture.isDone()).isTrue(); + } + + @Test + void testErrorHandlerIsNotCalledWhenSubmissionThrowsAnException() throws Exception { + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + final TestingDispatcherGateway.Builder dispatcherBuilder = + runningJobGatewayBuilder() + .setSubmitFunction( + jobGraph -> { + throw new FlinkRuntimeException("Nope!"); + }) + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final PackagedProgramApplication application = + createAndExecuteApplication( + 2, + dispatcherBuilder.build(), + scheduledExecutor, + errorHandlerFuture::completeExceptionally); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + // we return a future that is completed normally + completionFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + + // we do not call the error handler + assertThat(errorHandlerFuture.isDone()).isFalse(); + + // verify that we shut down the cluster + assertThat(shutdownCalled.get()).isTrue(); + } + + @Test + void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws Exception { + testErrorHandlerIsCalled( + () -> + FutureUtils.completedExceptionally( + new FlinkRuntimeException("Test exception."))); + } + + @Test + void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws Exception { + testErrorHandlerIsCalled( + () -> { + throw new FlinkRuntimeException("Test exception."); + }); + } + + private void testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> shutdownFunction) + throws Exception { + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) + .setRequestJobStatusFunction( + jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) + .setRequestJobResultFunction( + jobId -> + CompletableFuture.completedFuture( + createJobResult(jobId, JobStatus.FINISHED))) + .setClusterShutdownFunction(status -> shutdownFunction.get()); + + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); + final PackagedProgramApplication application = + createAndExecuteApplication( + 3, + dispatcherGateway, + scheduledExecutor, + errorHandlerFuture::completeExceptionally); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + // we call the error handler + assertException(errorHandlerFuture, FlinkRuntimeException.class); + + // we return a future that is completed exceptionally + assertException(completionFuture, FlinkRuntimeException.class); + + // shut down exception should not affect application result + assertApplicationFinished(application); + } + + @Test + void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws Exception { + final CompletableFuture<ApplicationStatus> clusterShutdown = new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherGatewayBuilder = + canceledJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + clusterShutdown.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final Configuration configuration = getConfiguration(); + configuration.set(DeploymentOptions.ATTACHED, true); + + final PackagedProgramApplication application = + createAndExecuteApplication(2, configuration, dispatcherGatewayBuilder.build()); + assertException( + application.getApplicationCompletionFuture(), UnsuccessfulExecutionException.class); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationCanceled(application); + + assertThat(clusterShutdown.get()).isEqualTo(ApplicationStatus.CANCELED); + } + + @Test + void testClusterShutdownWhenApplicationSucceeds() throws Exception { + // we're "listening" on this to be completed to verify that the cluster + // is being shut down from the PackagedProgramApplication + final CompletableFuture<ApplicationStatus> externalShutdownFuture = + new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + finishedJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + externalShutdownFuture.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + // wait until the application "thinks" it's done + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + + // verify that the dispatcher is actually being shut down + assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(ApplicationStatus.SUCCEEDED); + } + + @Test + void testClusterShutdownWhenApplicationFails() throws Exception { + // we're "listening" on this to be completed to verify that the cluster + // is being shut down from the PackagedProgramApplication + final CompletableFuture<ApplicationStatus> externalShutdownFuture = + new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + failedJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + externalShutdownFuture.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + // wait until the application "thinks" it's done + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + + // verify that the dispatcher is actually being shut down + assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(ApplicationStatus.FAILED); + } + + @Test + void testClusterShutdownWhenApplicationGetsCancelled() throws Exception { + // we're "listening" on this to be completed to verify that the cluster + // is being shut down from the PackagedProgramApplication + final CompletableFuture<ApplicationStatus> externalShutdownFuture = + new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + canceledJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + externalShutdownFuture.complete(status); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + // wait until the application "thinks" it's done + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationCanceled(application); + + // verify that the dispatcher is actually being shut down + assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .isEqualTo(ApplicationStatus.CANCELED); + } + + @Test + void testErrorHandlerIsNotCalledWhenApplicationStatusIsUnknown() throws Exception { + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + final TestingDispatcherGateway.Builder dispatcherBuilder = + canceledJobGatewayBuilder() + .setRequestJobResultFunction( + jobID -> + CompletableFuture.completedFuture( + createUnknownJobResult(jobID))) + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final PackagedProgramApplication application = + createAndExecuteApplication( + 3, + dispatcherGateway, + scheduledExecutor, + errorHandlerFuture::completeExceptionally); + + // check that application completes exceptionally + assertException( + application.getApplicationCompletionFuture(), UnsuccessfulExecutionException.class); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + + // we do not call the error handler + assertThat(errorHandlerFuture.isDone()).isFalse(); + + // verify that we shut down the cluster + assertThat(shutdownCalled.get()).isTrue(); + } + + @Test + void testErrorHandlerIsNotCalled() throws Exception { + final AtomicBoolean shutdownCalled = new AtomicBoolean(false); + // Job submission error + final TestingDispatcherGateway.Builder dispatcherBuilder = + finishedJobGatewayBuilder() + .setClusterShutdownFunction( + status -> { + shutdownCalled.set(true); + throw new FlinkRuntimeException("Nope!"); + }); + + final AtomicBoolean errorHandlerCalled = new AtomicBoolean(false); + final PackagedProgramApplication application = + createAndExecuteApplication( + 2, + getConfiguration(), + dispatcherBuilder.build(), + scheduledExecutor, + exception -> errorHandlerCalled.set(true), + false, + false, + true); + + final CompletableFuture<Acknowledge> completionFuture = + application.getFinishApplicationFuture(); + + // we return a future that is completed exceptionally + assertException(completionFuture, FlinkRuntimeException.class); + // shut down exception should not affect application result + assertApplicationFinished(application); + + // we do not call the error handler + assertThat(errorHandlerCalled.get()).isFalse(); + + // verify that we shut down the cluster + assertThat(shutdownCalled.get()).isTrue(); + } + + @Test + void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable { + final JobID testJobID = new JobID(0, 2); + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set( + PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + final TestingDispatcherGateway.Builder dispatcherBuilder = + finishedJobGatewayBuilder() + .setSubmitFunction( + jobGraph -> + FutureUtils.completedExceptionally( + DuplicateJobSubmissionException + .ofGloballyTerminated(testJobID))); + + final PackagedProgramApplication application = + createAndExecuteApplication( + 1, configurationUnderTest, dispatcherBuilder.build(), true); + + application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + } + + /** + * In this scenario, job result is no longer present in the {@link + * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has terminated and job + * manager failed over), but we know that job has already terminated from {@link + * org.apache.flink.runtime.highavailability.JobResultStore}. + */ + @Test + void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws Throwable { + final JobID testJobID = new JobID(0, 2); + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set( + PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> + FutureUtils.completedExceptionally( + DuplicateJobSubmissionException + .ofGloballyTerminated(testJobID))) + .setRequestJobStatusFunction( + jobId -> + FutureUtils.completedExceptionally( + new FlinkJobNotFoundException(jobId))) + .setRequestJobResultFunction( + jobId -> + FutureUtils.completedExceptionally( + new FlinkJobNotFoundException(jobId))); + + final PackagedProgramApplication application = + createAndExecuteApplication( + 1, configurationUnderTest, dispatcherBuilder.build(), true); + + application.getApplicationCompletionFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFinished(application); + } + + @Test + void testDuplicateJobSubmissionWithRunningJobId() throws Throwable { + final JobID testJobID = new JobID(0, 2); + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set( + PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> + FutureUtils.completedExceptionally( + DuplicateJobSubmissionException.of(testJobID))); + + final PackagedProgramApplication application = + createAndExecuteApplication(1, configurationUnderTest, dispatcherBuilder.build()); + final CompletableFuture<Void> applicationFuture = + application.getApplicationCompletionFuture(); + final ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + final Optional<DuplicateJobSubmissionException> maybeDuplicate = + ExceptionUtils.findThrowable( + executionException, DuplicateJobSubmissionException.class); + assertThat(maybeDuplicate).isPresent(); + assertThat(maybeDuplicate.get().isGloballyTerminated()).isFalse(); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + } + + @ParameterizedTest + @EnumSource( + value = JobStatus.class, + names = {"FINISHED", "CANCELED", "FAILED"}) + void testShutdownDisabled(JobStatus jobStatus) throws Exception { + final TestingDispatcherGateway dispatcherGateway = + dispatcherGatewayBuilder(jobStatus) + .setClusterShutdownFunction( + status -> { + fail("Cluster shutdown should not be called"); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + + final PackagedProgramApplication application = + createAndExecuteApplication( + 1, + getConfiguration(), + dispatcherGateway, + scheduledExecutor, + exception -> {}, + true, + false, + false); + + // Wait until application is finished to make sure cluster shutdown isn't called + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationStatus(application, ApplicationState.fromJobStatus(jobStatus)); + } + + @Test + void testSubmitFailedJobOnApplicationErrorWhenEnforceSingleJobExecution() throws Exception { + final Configuration configuration = getConfiguration(); + final JobID jobId = new JobID(); + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + testSubmitFailedJobOnApplicationError( + configuration, + (id, t) -> { + assertThat(id).isEqualTo(jobId); + assertThat(t) + .isInstanceOf(ProgramInvocationException.class) + .hasRootCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage(FailingJob.EXCEPTION_MESSAGE); + }); + } + + private void testSubmitFailedJobOnApplicationError( + Configuration configuration, BiConsumer<JobID, Throwable> failedJobAssertion) + throws Exception { + final CompletableFuture<Void> submitted = new CompletableFuture<>(); + final TestingDispatcherGateway dispatcherGateway = + TestingDispatcherGateway.newBuilder() + .setSubmitFailedFunction( + (jobId, jobName, t) -> { + try { + failedJobAssertion.accept(jobId, t); + submitted.complete(null); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (Throwable assertion) { + submitted.completeExceptionally(assertion); + return FutureUtils.completedExceptionally(assertion); + } + }) + .setRequestJobStatusFunction( + jobId -> submitted.thenApply(ignored -> JobStatus.FAILED)) + .setRequestJobResultFunction( + jobId -> + submitted.thenApply( + ignored -> + createJobResult(jobId, JobStatus.FAILED))) + .build(); + + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + FailingJob.getProgram(), + Collections.emptyList(), + configuration, + true, + true /* enforceSingleJobExecution */, + true /* submitFailedJobOnApplicationError */, + true); + + executeApplication(application, dispatcherGateway, scheduledExecutor, exception -> {}); + + application.getFinishApplicationFuture().get(); + assertApplicationFailed(application); + } + + @Test + void testSubmitFailedJobOnApplicationErrorWhenNotEnforceSingleJobExecution() throws Exception { + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + FailingJob.getProgram(), + Collections.emptyList(), + getConfiguration(), + true, + false /* enforceSingleJobExecution */, + true /* submitFailedJobOnApplicationError */, + true); + + executeApplication( + application, + TestingDispatcherGateway.newBuilder().build(), + scheduledExecutor, + exception -> {}); + + assertThatFuture(application.getApplicationCompletionFuture()) + .eventuallyFailsWith(ExecutionException.class) + .extracting(Throwable::getCause) + .satisfies( + e -> + assertThat(e) + .hasMessageContaining( + DeploymentOptions + .SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR + .key())); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + } + + private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() { + return dispatcherGatewayBuilder(JobStatus.FINISHED); + } + + private TestingDispatcherGateway.Builder failedJobGatewayBuilder() { + return dispatcherGatewayBuilder(JobStatus.FAILED); + } + + private TestingDispatcherGateway.Builder canceledJobGatewayBuilder() { + return dispatcherGatewayBuilder(JobStatus.CANCELED); + } + + private TestingDispatcherGateway.Builder runningJobGatewayBuilder() { + return dispatcherGatewayBuilder(JobStatus.RUNNING); + } + + private TestingDispatcherGateway.Builder dispatcherGatewayBuilder(JobStatus jobStatus) { + TestingDispatcherGateway.Builder builder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) + .setRequestJobStatusFunction( + jobId -> CompletableFuture.completedFuture(jobStatus)); + if (jobStatus != JobStatus.RUNNING) { + builder.setRequestJobResultFunction( + jobID -> CompletableFuture.completedFuture(createJobResult(jobID, jobStatus))); + } + return builder; + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, final DispatcherGateway dispatcherGateway) throws FlinkException { + return createAndExecuteApplication( + numJobs, getConfiguration(), dispatcherGateway, scheduledExecutor, exception -> {}); + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, + final Configuration configuration, + final DispatcherGateway dispatcherGateway) + throws FlinkException { + return createAndExecuteApplication(numJobs, configuration, dispatcherGateway, false); + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, + final Configuration configuration, + final DispatcherGateway dispatcherGateway, + final boolean enforceSingleJobExecution) + throws FlinkException { + return createAndExecuteApplication( + numJobs, + configuration, + dispatcherGateway, + scheduledExecutor, + exception -> {}, + true, + enforceSingleJobExecution, + true); + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler) + throws FlinkException { + return createAndExecuteApplication( + numJobs, getConfiguration(), dispatcherGateway, scheduledExecutor, errorHandler); + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, + final Configuration configuration, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler) + throws FlinkException { + return createAndExecuteApplication( + numJobs, + configuration, + dispatcherGateway, + scheduledExecutor, + errorHandler, + true, + false, + true); + } + + private PackagedProgramApplication createAndExecuteApplication( + final int numJobs, + final Configuration configuration, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler, + boolean handleFatalError, + boolean enforceSingleJobExecution, + boolean shutDownOnFinish) + throws FlinkException { + + final PackagedProgram program = getProgram(numJobs); + + final PackagedProgramApplication application = + new PackagedProgramApplication( + new ApplicationID(), + program, + Collections.emptyList(), + configuration, + handleFatalError, + enforceSingleJobExecution, + false, + shutDownOnFinish); + assertApplicationCreated(application); + + executeApplication(application, dispatcherGateway, scheduledExecutor, errorHandler); + return application; + } + + private void executeApplication( + final PackagedProgramApplication application, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler) { + + CompletableFuture.runAsync( + () -> + application.execute( + dispatcherGateway, + scheduledExecutor, + mainThreadExecutor, + errorHandler), + mainThreadExecutor) + .join(); + } + + private PackagedProgram getProgram(int numJobs) throws FlinkException { + return MultiExecuteJob.getProgram(numJobs, true); + } + + private static JobResult createFailedJobResult(final JobID jobId) { + return createJobResult(jobId, JobStatus.FAILED); + } + + private static JobResult createCanceledJobResult(final JobID jobId) { + return createJobResult(jobId, JobStatus.CANCELED); + } + + private static JobResult createUnknownJobResult(final JobID jobId) { + return createJobResult(jobId, null); + } + + private static JobResult createJobResult( + final JobID jobID, @Nullable final JobStatus jobStatus) { + JobResult.Builder builder = + new JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus); + if (jobStatus == JobStatus.CANCELED) { + builder.serializedThrowable( + new SerializedThrowable(new JobCancellationException(jobID, "Hello", null))); + } else if (jobStatus == JobStatus.FAILED || jobStatus == null) { + builder.serializedThrowable( + new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla"))); + } + return builder.build(); + } + + private static <T, E extends Throwable> E assertException( + CompletableFuture<T> future, Class<E> exceptionClass) throws Exception { + + try { + future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Throwable e) { + Optional<E> maybeException = ExceptionUtils.findThrowable(e, exceptionClass); + if (!maybeException.isPresent()) { + throw e; + } + return maybeException.get(); + } + throw new Exception( + "Future should have completed exceptionally with " + + exceptionClass.getCanonicalName() + + "."); + } + + private Configuration getConfiguration() { + final Configuration configuration = new Configuration(); + configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + return configuration; + } + + private void assertApplicationStatus( + PackagedProgramApplication application, ApplicationState expectedStatus) { + assertThat(application.getApplicationStatus()).isEqualTo(expectedStatus); + } + + private void assertApplicationCreated(PackagedProgramApplication application) { + assertApplicationStatus(application, ApplicationState.CREATED); + } + + private void assertApplicationFailing(PackagedProgramApplication application) { + assertApplicationStatus(application, ApplicationState.FAILING); + } + + private void assertApplicationCanceled(PackagedProgramApplication application) { + assertApplicationStatus(application, ApplicationState.CANCELED); + } + + private void assertApplicationFailed(PackagedProgramApplication application) { + assertApplicationStatus(application, ApplicationState.FAILED); + } + + private void assertApplicationFinished(PackagedProgramApplication application) { + assertApplicationStatus(application, ApplicationState.FINISHED); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java new file mode 100644 index 00000000000..2f2d684ec65 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramDescriptorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.client.cli.CliFrontendTestUtils; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PackagedProgramDescriptor}. */ +public class PackagedProgramDescriptorTest { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testPackagedProgramDescriptor(boolean jarFileIsNull) throws Exception { + File testJar = new File(CliFrontendTestUtils.getTestJarPath()); + File jarFile = jarFileIsNull ? null : testJar; + SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none(); + List<URL> userClassPaths = Collections.singletonList(testJar.toURI().toURL()); + String[] programArgs = {"--input", "test", "--output", "result"}; + String mainClasssName = TEST_JAR_MAIN_CLASS; + PackagedProgram program = + PackagedProgram.newBuilder() + .setJarFile(jarFile) + .setEntryPointClassName(mainClasssName) + .setUserClassPaths(userClassPaths) + .setArguments(programArgs) + .setSavepointRestoreSettings(savepointSettings) + .build(); + PackagedProgramDescriptor descriptor = program.getDescriptor(); + + assertThat(descriptor.getMainClassName()).isEqualTo(mainClasssName); + + PackagedProgram reconstructedProgram = descriptor.toPackageProgram(); + + // Verify the reconstructed program has the expected properties + assertThat(reconstructedProgram.getUserCodeClassLoader()).isNotNull(); + assertThat(reconstructedProgram.getClasspaths()) + .containsExactly(userClassPaths.toArray(URL[]::new)); + assertThat(reconstructedProgram.getSavepointSettings()).isEqualTo(savepointSettings); + assertThat(reconstructedProgram.getArguments()).containsExactly(programArgs); + assertThat(reconstructedProgram.getMainClassName()).isEqualTo(mainClasssName); + + reconstructedProgram.close(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java index b5b7f5db04d..12e1ebfa144 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationState.java @@ -21,6 +21,9 @@ package org.apache.flink.api.common; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.DeploymentOptions; +import java.util.HashMap; +import java.util.Map; + /** Possible states of an application. */ @PublicEvolving public enum ApplicationState { @@ -60,4 +63,28 @@ public enum ApplicationState { public boolean isTerminalState() { return terminalState; } + + private static final Map<JobStatus, ApplicationState> JOB_STATUS_APPLICATION_STATE_MAP = + new HashMap<>(); + + static { + // only globally terminal JobStatus can have a corresponding ApplicationState + JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.FAILED, ApplicationState.FAILED); + JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.CANCELED, ApplicationState.CANCELED); + JOB_STATUS_APPLICATION_STATE_MAP.put(JobStatus.FINISHED, ApplicationState.FINISHED); + } + + /** + * Derives the ApplicationState that corresponds to the given JobStatus. This method only + * accepts globally terminal JobStatus. If the job status is not globally terminal, this method + * throws an IllegalArgumentException. + */ + public static ApplicationState fromJobStatus(JobStatus jobStatus) { + if (!JOB_STATUS_APPLICATION_STATE_MAP.containsKey(jobStatus)) { + throw new IllegalArgumentException( + "JobStatus " + jobStatus + " does not have a corresponding ApplicationState."); + } + + return JOB_STATUS_APPLICATION_STATE_MAP.get(jobStatus); + } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java index bd422a629ad..2e127c5cf2e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java @@ -155,4 +155,13 @@ public class DeploymentOptions { + "regardless of this setting.", TextElement.text(PROGRAM_CONFIG_WILDCARDS.key())) .build()); + + @Experimental + public static final ConfigOption<Boolean> TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION = + ConfigOptions.key("execution.terminate-application-on-any-job-terminated-exceptionally") + .booleanType() + .defaultValue(true) + .withDescription( + "When it is set to true, the application will complete exceptionally if any job fails or is canceled." + + " When it is set to false, the application will finish after all jobs reach terminal states."); }
