zhuzhurk commented on code in PR #27741:
URL: https://github.com/apache/flink/pull/27741#discussion_r2959488533
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -428,24 +484,53 @@ public void onStart() throws Exception {
this::onFatalError);
if (dispatcherBootstrap instanceof ApplicationBootstrap) {
- // defer starting recovered jobs, as they might be skipped based
on user logic
- internalSubmitApplication(((ApplicationBootstrap)
dispatcherBootstrap).getApplication())
- .get();
+ // Application Mode
+ checkState(suspendedApplications.isEmpty());
+ checkState(recoveredDirtyApplicationResults.size() <= 1);
+
+ AbstractApplication application =
+ ((ApplicationBootstrap)
dispatcherBootstrap).getApplication();
+ if (!recoveredDirtyApplicationResults.isEmpty()) {
+ // the application is already terminated
+ ApplicationResult applicationResult =
+ recoveredDirtyApplicationResults.iterator().next();
+ checkState(application.getApplicationId() ==
applicationResult.getApplicationId());
Review Comment:
== -> equals
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+
+/** {@code ApplicationStoreEntry} for {@code PackagedProgramApplication}. */
+public class PackagedProgramApplicationEntry implements ApplicationStoreEntry {
+
+ private final Configuration configuration;
+
+ private final PermanentBlobKey userJarBlobKey;
+
+ private final String entryClass;
+
+ private final String[] programArgs;
+
+ private final ApplicationID applicationId;
+
+ private final String applicationName;
+
+ private final boolean handleFatalError;
+
+ private final boolean enforceSingleJobExecution;
+
+ private final boolean submitFailedJobOnApplicationError;
+
+ private final boolean shutDownOnFinish;
+
+ public PackagedProgramApplicationEntry(
+ Configuration configuration,
+ PermanentBlobKey userJarBlobKey,
+ String entryClass,
+ String[] programArgs,
+ ApplicationID applicationId,
+ String applicationName,
+ boolean handleFatalError,
+ boolean enforceSingleJobExecution,
+ boolean submitFailedJobOnApplicationError,
+ boolean shutDownOnFinish) {
+ this.configuration = configuration;
+ this.userJarBlobKey = userJarBlobKey;
+ this.entryClass = entryClass;
+ this.programArgs = programArgs;
+ this.applicationId = applicationId;
+ this.applicationName = applicationName;
+ this.handleFatalError = handleFatalError;
+ this.enforceSingleJobExecution = enforceSingleJobExecution;
+ this.submitFailedJobOnApplicationError =
submitFailedJobOnApplicationError;
+ this.shutDownOnFinish = shutDownOnFinish;
+ }
+
+ @Override
+ public AbstractApplication getApplication(
+ PermanentBlobService blobService,
+ Collection<JobInfo> recoveredJobInfos,
+ Collection<JobInfo> recoveredTerminalJobInfos) {
+ File jarFile;
+ try {
+ jarFile = blobService.getFile(applicationId, userJarBlobKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get user jar file from
blob", e);
+ }
+
+ if (!jarFile.exists()) {
+ throw new RuntimeException(String.format("Jar file %s does not
exist", jarFile));
+ }
+
+ PackagedProgram program;
+ try {
+ program =
+ PackagedProgram.newBuilder()
+ .setJarFile(jarFile)
+ .setEntryPointClassName(entryClass)
+ .setConfiguration(configuration)
+ .setUserClassPaths(getClasspaths())
+ .setArguments(programArgs)
+ .build();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to create PackagedProgram for application
%s", applicationId));
Review Comment:
The root cause should be included.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -887,6 +1062,83 @@ private void
writeToArchivedApplicationStore(ArchivedApplication archivedApplica
}
}
+ private CompletableFuture<?>
registerGloballyTerminatedApplicationInApplicationResultStore(
+ ArchivedApplication application) {
+ final ApplicationID applicationId = application.getApplicationId();
+
+ return applicationResultStore
+ .hasCleanApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasCleanResult -> {
+ if (hasCleanResult) {
+ log.warn(
+ "Application {} is already marked as
clean but clean up was triggered again.",
+ applicationId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+
.hasDirtyApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasDirtyResult -> {
+ if (hasDirtyResult) {
+ return
FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+
.createDirtyResultAsync(
+ new
ApplicationResultEntry(
+
ApplicationResult
+
.createFrom(
+
application)));
+ });
+ })
+ .handleAsync(
+ (ignored, error) -> {
+ if (error != null) {
+ fatalErrorHandler.onFatalError(
+ new FlinkException(
+ String.format(
+ "The application %s
couldn't be marked as pre-cleanup finished in ApplicationResultStore.",
+ applicationId),
+ error));
Review Comment:
It should return instead of continuing.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -887,6 +1062,83 @@ private void
writeToArchivedApplicationStore(ArchivedApplication archivedApplica
}
}
+ private CompletableFuture<?>
registerGloballyTerminatedApplicationInApplicationResultStore(
+ ArchivedApplication application) {
+ final ApplicationID applicationId = application.getApplicationId();
+
+ return applicationResultStore
+ .hasCleanApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasCleanResult -> {
+ if (hasCleanResult) {
+ log.warn(
+ "Application {} is already marked as
clean but clean up was triggered again.",
+ applicationId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+
.hasDirtyApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasDirtyResult -> {
+ if (hasDirtyResult) {
+ return
FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+
.createDirtyResultAsync(
+ new
ApplicationResultEntry(
+
ApplicationResult
+
.createFrom(
+
application)));
+ });
+ })
+ .handleAsync(
+ (ignored, error) -> {
+ if (error != null) {
+ fatalErrorHandler.onFatalError(
+ new FlinkException(
+ String.format(
+ "The application %s
couldn't be marked as pre-cleanup finished in ApplicationResultStore.",
+ applicationId),
+ error));
+ }
+
applicationCreateDirtyResultFutures.get(applicationId).complete(null);
+ return null;
+ },
+ getMainThreadExecutor());
+ }
+
+ private CompletableFuture<Void> removeApplication(
+ ApplicationID applicationId, Collection<JobID> jobs) {
+ return applicationResourceCleaner
+ .cleanupAsync(applicationId)
+ .thenCombine(
+ FutureUtils.waitForAll(
+ jobs.stream()
+ .map(jobMarkResultCleanFutures::get)
+ .collect(Collectors.toList())),
+ (unused1, unused2) ->
+
applicationResultStore.markResultAsCleanAsync(applicationId))
Review Comment:
If `markResultAsCleanAsync` did not throw an exception directly, but instead
returned a future that is completed exceptionally, the exceptional future will
be ignored.
The same problem also applies to the `markResultAsCleanAsync` of jobs.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java:
##########
@@ -98,46 +123,93 @@ private void startServices() {
getClass().getSimpleName()),
e);
}
+
+ try {
+ applicationStore.start();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Could not start %s when trying to start the %s.",
+ applicationStore.getClass().getSimpleName(),
+ getClass().getSimpleName()),
+ e);
+ }
}
private void createDispatcherIfRunning(
Collection<ExecutionPlan> executionPlans,
- Collection<JobResult> recoveredDirtyJobResults) {
+ Collection<JobResult> recoveredDirtyJobResults,
+ Collection<AbstractApplication> recoveredApplications,
+ Collection<ApplicationResult> recoveredDirtyApplicationResults) {
runIfStateIs(
- State.RUNNING, () -> createDispatcher(executionPlans,
recoveredDirtyJobResults));
+ State.RUNNING,
+ () ->
+ createDispatcher(
+ executionPlans,
+ recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults));
}
private void createDispatcher(
Collection<ExecutionPlan> executionPlans,
- Collection<JobResult> recoveredDirtyJobResults) {
+ Collection<JobResult> recoveredDirtyJobResults,
+ Collection<AbstractApplication> recoveredApplications,
+ Collection<ApplicationResult> recoveredDirtyApplicationResults) {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
executionPlans,
recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
executionPlanStore,
- jobResultStore);
+ jobResultStore,
+ applicationStore,
+ applicationResultStore);
completeDispatcherSetup(dispatcherService);
}
- private CompletableFuture<Void>
-
createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults() {
- // TODO support application recovery which may require fetching user
jar from blob server
-
- final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
+ private CompletableFuture<Void>
createDispatcherBasedOnRecoveredApplicationsAndJobs() {
+ final CompletableFuture<Collection<JobResult>> dirtyJobResultsFuture =
CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);
- return dirtyJobsFuture
- .thenApplyAsync(
- dirtyJobs ->
- this.recoverJobsIfRunning(
- dirtyJobs.stream()
+ final CompletableFuture<Collection<ExecutionPlan>> recoveredJobsFuture
=
+ dirtyJobResultsFuture.thenApplyAsync(
+ dirtyJobResults ->
+ recoverJobsIfRunning(
+ dirtyJobResults.stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet())),
- ioExecutor)
- .thenAcceptBoth(dirtyJobsFuture,
this::createDispatcherIfRunning)
+ ioExecutor);
+
+ final CompletableFuture<Collection<ApplicationResult>>
dirtyApplicationResultsFuture =
+ CompletableFuture.supplyAsync(
+ this::getDirtyApplicationResultsIfRunning, ioExecutor);
+
+ final CompletableFuture<Collection<AbstractApplication>>
recoveredApplicationsFuture =
+ dirtyApplicationResultsFuture.thenCombineAsync(
+ recoveredJobsFuture,
+ (dirtyApplicationResults, recoveredJobs) -> {
+ return recoverApplicationsIfRunning(
+ dirtyApplicationResults.stream()
+
.map(ApplicationResult::getApplicationId)
+ .collect(Collectors.toSet()),
+ recoveredJobsFuture.join(),
Review Comment:
`recoveredJobs` is already available.
And it's better to use `thenAcceptBoth / allOf` instead of `join`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1051,7 +1302,9 @@ private void runJob(
final JobID jobId = jobManagerRunner.getJobID();
- partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>());
+ jobCreateDirtyResultFutures.put(jobId, new CompletableFuture<>());
+ jobMarkResultCleanFutures.put(jobId, new CompletableFuture<>());
+ jobIdsToApplicationIds.put(jobId, applicationId);
Review Comment:
The items added to these three maps are never removed. It will lead to
memory leak in session mode.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * {@code DefaultApplicationResourceCleaner} is the default implementation of
{@link
+ * ApplicationResourceCleaner}. It will try to clean up any resource that was
added. Failure will
+ * result in an individual retry of the cleanup. The overall cleanup result
succeeds after all
+ * subtasks succeeded.
+ */
+public class DefaultApplicationResourceCleaner<T> implements
ApplicationResourceCleaner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultApplicationResourceCleaner.class);
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn<T> cleanupFn;
+
+ private final Collection<CleanupWithLabel<T>> regularCleanup;
+
+ private final RetryStrategy retryStrategy;
+
+ public static Builder<GloballyCleanableApplicationResource>
forGloballyCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ RetryStrategy retryStrategy) {
+ return forCleanableResources(
+ mainThreadExecutor,
+ cleanupExecutor,
+ GloballyCleanableApplicationResource::globalCleanupAsync,
+ retryStrategy);
+ }
+
+ @VisibleForTesting
+ static <T> Builder<T> forCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFunction,
+ RetryStrategy retryStrategy) {
+ return new Builder<>(mainThreadExecutor, cleanupExecutor,
cleanupFunction, retryStrategy);
+ }
+
+ @VisibleForTesting
+ @FunctionalInterface
+ interface CleanupFn<T> {
+ CompletableFuture<Void> cleanupAsync(
+ T resource, ApplicationID applicationId, Executor
cleanupExecutor);
+ }
+
+ /**
+ * {@code Builder} for creating {@code DefaultApplicationResourceCleaner}
instances.
+ *
+ * @param <T> The functional interface that's being translated into the
internally used {@link
+ * CleanupFn}.
+ */
+ public static class Builder<T> {
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn<T> cleanupFn;
+
+ private final RetryStrategy retryStrategy;
+
+ private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new
ArrayList<>();
+ private final Collection<CleanupWithLabel<T>> regularCleanup = new
ArrayList<>();
+
+ private Builder(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFn,
+ RetryStrategy retryStrategy) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ this.retryStrategy = retryStrategy;
+ }
+
+ /**
+ * Prioritized cleanups run before their regular counterparts. This
method enables the
+ * caller to model dependencies between cleanup tasks. The order in
which cleanable
+ * resources are added matters, i.e. if two cleanable resources are
added as prioritized
+ * cleanup tasks, the resource being added first will block the
cleanup of the second
+ * resource. All prioritized cleanup resources will run and finish
before any resource that
+ * is added using {@link #withRegularCleanup(String, Object)} is
started.
+ *
+ * @param label The label being used when logging errors in the given
cleanup.
+ * @param prioritizedCleanup The cleanup callback that is going to be
prioritized.
+ */
+ public Builder<T> withPrioritizedCleanup(String label, T
prioritizedCleanup) {
+ this.prioritizedCleanup.add(new
CleanupWithLabel<>(prioritizedCleanup, label));
+ return this;
+ }
+
+ /**
+ * Regular cleanups are resources for which the cleanup is triggered
after all prioritized
+ * cleanups succeeded. All added regular cleanups will run
concurrently to each other.
+ *
+ * @param label The label being used when logging errors in the given
cleanup.
+ * @param regularCleanup The cleanup callback that is going to run
after all prioritized
+ * cleanups are finished.
+ * @see #withPrioritizedCleanup(String, Object)
+ */
+ public Builder<T> withRegularCleanup(String label, T regularCleanup) {
+ this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup,
label));
+ return this;
+ }
+
+ public DefaultApplicationResourceCleaner build() {
+ return new DefaultApplicationResourceCleaner<>(
+ mainThreadExecutor,
+ cleanupExecutor,
+ cleanupFn,
+ prioritizedCleanup,
+ regularCleanup,
+ retryStrategy);
+ }
+ }
+
+ private DefaultApplicationResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn<T> cleanupFn,
+ Collection<CleanupWithLabel<T>> prioritizedCleanup,
+ Collection<CleanupWithLabel<T>> regularCleanup,
+ RetryStrategy retryStrategy) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ this.regularCleanup = regularCleanup;
+ this.retryStrategy = retryStrategy;
Review Comment:
The prioritizedCleanup is not recorded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]