zhuzhurk commented on code in PR #27741:
URL: https://github.com/apache/flink/pull/27741#discussion_r2959488533


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -428,24 +484,53 @@ public void onStart() throws Exception {
                         this::onFatalError);
 
         if (dispatcherBootstrap instanceof ApplicationBootstrap) {
-            // defer starting recovered jobs, as they might be skipped based 
on user logic
-            internalSubmitApplication(((ApplicationBootstrap) 
dispatcherBootstrap).getApplication())
-                    .get();
+            // Application Mode
+            checkState(suspendedApplications.isEmpty());
+            checkState(recoveredDirtyApplicationResults.size() <= 1);
+
+            AbstractApplication application =
+                    ((ApplicationBootstrap) 
dispatcherBootstrap).getApplication();
+            if (!recoveredDirtyApplicationResults.isEmpty()) {
+                // the application is already terminated
+                ApplicationResult applicationResult =
+                        recoveredDirtyApplicationResults.iterator().next();
+                checkState(application.getApplicationId() == 
applicationResult.getApplicationId());

Review Comment:
   == -> equals



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+
+/** {@code ApplicationStoreEntry} for {@code PackagedProgramApplication}. */
+public class PackagedProgramApplicationEntry implements ApplicationStoreEntry {
+
+    private final Configuration configuration;
+
+    private final PermanentBlobKey userJarBlobKey;
+
+    private final String entryClass;
+
+    private final String[] programArgs;
+
+    private final ApplicationID applicationId;
+
+    private final String applicationName;
+
+    private final boolean handleFatalError;
+
+    private final boolean enforceSingleJobExecution;
+
+    private final boolean submitFailedJobOnApplicationError;
+
+    private final boolean shutDownOnFinish;
+
+    public PackagedProgramApplicationEntry(
+            Configuration configuration,
+            PermanentBlobKey userJarBlobKey,
+            String entryClass,
+            String[] programArgs,
+            ApplicationID applicationId,
+            String applicationName,
+            boolean handleFatalError,
+            boolean enforceSingleJobExecution,
+            boolean submitFailedJobOnApplicationError,
+            boolean shutDownOnFinish) {
+        this.configuration = configuration;
+        this.userJarBlobKey = userJarBlobKey;
+        this.entryClass = entryClass;
+        this.programArgs = programArgs;
+        this.applicationId = applicationId;
+        this.applicationName = applicationName;
+        this.handleFatalError = handleFatalError;
+        this.enforceSingleJobExecution = enforceSingleJobExecution;
+        this.submitFailedJobOnApplicationError = 
submitFailedJobOnApplicationError;
+        this.shutDownOnFinish = shutDownOnFinish;
+    }
+
+    @Override
+    public AbstractApplication getApplication(
+            PermanentBlobService blobService,
+            Collection<JobInfo> recoveredJobInfos,
+            Collection<JobInfo> recoveredTerminalJobInfos) {
+        File jarFile;
+        try {
+            jarFile = blobService.getFile(applicationId, userJarBlobKey);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get user jar file from 
blob", e);
+        }
+
+        if (!jarFile.exists()) {
+            throw new RuntimeException(String.format("Jar file %s does not 
exist", jarFile));
+        }
+
+        PackagedProgram program;
+        try {
+            program =
+                    PackagedProgram.newBuilder()
+                            .setJarFile(jarFile)
+                            .setEntryPointClassName(entryClass)
+                            .setConfiguration(configuration)
+                            .setUserClassPaths(getClasspaths())
+                            .setArguments(programArgs)
+                            .build();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to create PackagedProgram for application 
%s", applicationId));

Review Comment:
   The root cause should be included.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -887,6 +1062,83 @@ private void 
writeToArchivedApplicationStore(ArchivedApplication archivedApplica
         }
     }
 
+    private CompletableFuture<?> 
registerGloballyTerminatedApplicationInApplicationResultStore(
+            ArchivedApplication application) {
+        final ApplicationID applicationId = application.getApplicationId();
+
+        return applicationResultStore
+                .hasCleanApplicationResultEntryAsync(applicationId)
+                .thenCompose(
+                        hasCleanResult -> {
+                            if (hasCleanResult) {
+                                log.warn(
+                                        "Application {} is already marked as 
clean but clean up was triggered again.",
+                                        applicationId);
+                                return FutureUtils.completedVoidFuture();
+                            }
+
+                            return applicationResultStore
+                                    
.hasDirtyApplicationResultEntryAsync(applicationId)
+                                    .thenCompose(
+                                            hasDirtyResult -> {
+                                                if (hasDirtyResult) {
+                                                    return 
FutureUtils.completedVoidFuture();
+                                                }
+
+                                                return applicationResultStore
+                                                        
.createDirtyResultAsync(
+                                                                new 
ApplicationResultEntry(
+                                                                        
ApplicationResult
+                                                                               
 .createFrom(
+                                                                               
         application)));
+                                            });
+                        })
+                .handleAsync(
+                        (ignored, error) -> {
+                            if (error != null) {
+                                fatalErrorHandler.onFatalError(
+                                        new FlinkException(
+                                                String.format(
+                                                        "The application %s 
couldn't be marked as pre-cleanup finished in ApplicationResultStore.",
+                                                        applicationId),
+                                                error));

Review Comment:
   It should return instead of continuing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -887,6 +1062,83 @@ private void 
writeToArchivedApplicationStore(ArchivedApplication archivedApplica
         }
     }
 
+    private CompletableFuture<?> 
registerGloballyTerminatedApplicationInApplicationResultStore(
+            ArchivedApplication application) {
+        final ApplicationID applicationId = application.getApplicationId();
+
+        return applicationResultStore
+                .hasCleanApplicationResultEntryAsync(applicationId)
+                .thenCompose(
+                        hasCleanResult -> {
+                            if (hasCleanResult) {
+                                log.warn(
+                                        "Application {} is already marked as 
clean but clean up was triggered again.",
+                                        applicationId);
+                                return FutureUtils.completedVoidFuture();
+                            }
+
+                            return applicationResultStore
+                                    
.hasDirtyApplicationResultEntryAsync(applicationId)
+                                    .thenCompose(
+                                            hasDirtyResult -> {
+                                                if (hasDirtyResult) {
+                                                    return 
FutureUtils.completedVoidFuture();
+                                                }
+
+                                                return applicationResultStore
+                                                        
.createDirtyResultAsync(
+                                                                new 
ApplicationResultEntry(
+                                                                        
ApplicationResult
+                                                                               
 .createFrom(
+                                                                               
         application)));
+                                            });
+                        })
+                .handleAsync(
+                        (ignored, error) -> {
+                            if (error != null) {
+                                fatalErrorHandler.onFatalError(
+                                        new FlinkException(
+                                                String.format(
+                                                        "The application %s 
couldn't be marked as pre-cleanup finished in ApplicationResultStore.",
+                                                        applicationId),
+                                                error));
+                            }
+                            
applicationCreateDirtyResultFutures.get(applicationId).complete(null);
+                            return null;
+                        },
+                        getMainThreadExecutor());
+    }
+
+    private CompletableFuture<Void> removeApplication(
+            ApplicationID applicationId, Collection<JobID> jobs) {
+        return applicationResourceCleaner
+                .cleanupAsync(applicationId)
+                .thenCombine(
+                        FutureUtils.waitForAll(
+                                jobs.stream()
+                                        .map(jobMarkResultCleanFutures::get)
+                                        .collect(Collectors.toList())),
+                        (unused1, unused2) ->
+                                
applicationResultStore.markResultAsCleanAsync(applicationId))

Review Comment:
   If `markResultAsCleanAsync` did not throw an exception directly, but instead 
returned a future that is completed exceptionally, the exceptional future will 
be ignored.
   The same problem also applies to the `markResultAsCleanAsync` of jobs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java:
##########
@@ -98,46 +123,93 @@ private void startServices() {
                             getClass().getSimpleName()),
                     e);
         }
+
+        try {
+            applicationStore.start();
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not start %s when trying to start the %s.",
+                            applicationStore.getClass().getSimpleName(),
+                            getClass().getSimpleName()),
+                    e);
+        }
     }
 
     private void createDispatcherIfRunning(
             Collection<ExecutionPlan> executionPlans,
-            Collection<JobResult> recoveredDirtyJobResults) {
+            Collection<JobResult> recoveredDirtyJobResults,
+            Collection<AbstractApplication> recoveredApplications,
+            Collection<ApplicationResult> recoveredDirtyApplicationResults) {
         runIfStateIs(
-                State.RUNNING, () -> createDispatcher(executionPlans, 
recoveredDirtyJobResults));
+                State.RUNNING,
+                () ->
+                        createDispatcher(
+                                executionPlans,
+                                recoveredDirtyJobResults,
+                                recoveredApplications,
+                                recoveredDirtyApplicationResults));
     }
 
     private void createDispatcher(
             Collection<ExecutionPlan> executionPlans,
-            Collection<JobResult> recoveredDirtyJobResults) {
+            Collection<JobResult> recoveredDirtyJobResults,
+            Collection<AbstractApplication> recoveredApplications,
+            Collection<ApplicationResult> recoveredDirtyApplicationResults) {
 
         final DispatcherGatewayService dispatcherService =
                 dispatcherGatewayServiceFactory.create(
                         DispatcherId.fromUuid(getLeaderSessionId()),
                         executionPlans,
                         recoveredDirtyJobResults,
+                        recoveredApplications,
+                        recoveredDirtyApplicationResults,
                         executionPlanStore,
-                        jobResultStore);
+                        jobResultStore,
+                        applicationStore,
+                        applicationResultStore);
 
         completeDispatcherSetup(dispatcherService);
     }
 
-    private CompletableFuture<Void>
-            
createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults() {
-        // TODO support application recovery which may require fetching user 
jar from blob server
-
-        final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
+    private CompletableFuture<Void> 
createDispatcherBasedOnRecoveredApplicationsAndJobs() {
+        final CompletableFuture<Collection<JobResult>> dirtyJobResultsFuture =
                 
CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);
 
-        return dirtyJobsFuture
-                .thenApplyAsync(
-                        dirtyJobs ->
-                                this.recoverJobsIfRunning(
-                                        dirtyJobs.stream()
+        final CompletableFuture<Collection<ExecutionPlan>> recoveredJobsFuture 
=
+                dirtyJobResultsFuture.thenApplyAsync(
+                        dirtyJobResults ->
+                                recoverJobsIfRunning(
+                                        dirtyJobResults.stream()
                                                 .map(JobResult::getJobId)
                                                 .collect(Collectors.toSet())),
-                        ioExecutor)
-                .thenAcceptBoth(dirtyJobsFuture, 
this::createDispatcherIfRunning)
+                        ioExecutor);
+
+        final CompletableFuture<Collection<ApplicationResult>> 
dirtyApplicationResultsFuture =
+                CompletableFuture.supplyAsync(
+                        this::getDirtyApplicationResultsIfRunning, ioExecutor);
+
+        final CompletableFuture<Collection<AbstractApplication>> 
recoveredApplicationsFuture =
+                dirtyApplicationResultsFuture.thenCombineAsync(
+                        recoveredJobsFuture,
+                        (dirtyApplicationResults, recoveredJobs) -> {
+                            return recoverApplicationsIfRunning(
+                                    dirtyApplicationResults.stream()
+                                            
.map(ApplicationResult::getApplicationId)
+                                            .collect(Collectors.toSet()),
+                                    recoveredJobsFuture.join(),

Review Comment:
   `recoveredJobs` is already available.
   And it's better to use `thenAcceptBoth / allOf` instead of `join`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1051,7 +1302,9 @@ private void runJob(
 
         final JobID jobId = jobManagerRunner.getJobID();
 
-        partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>());
+        jobCreateDirtyResultFutures.put(jobId, new CompletableFuture<>());
+        jobMarkResultCleanFutures.put(jobId, new CompletableFuture<>());
+        jobIdsToApplicationIds.put(jobId, applicationId);

Review Comment:
   The items added to these three maps are never removed. It will lead to 
memory leak in session mode.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * {@code DefaultApplicationResourceCleaner} is the default implementation of 
{@link
+ * ApplicationResourceCleaner}. It will try to clean up any resource that was 
added. Failure will
+ * result in an individual retry of the cleanup. The overall cleanup result 
succeeds after all
+ * subtasks succeeded.
+ */
+public class DefaultApplicationResourceCleaner<T> implements 
ApplicationResourceCleaner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DefaultApplicationResourceCleaner.class);
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+    private final Executor cleanupExecutor;
+    private final CleanupFn<T> cleanupFn;
+
+    private final Collection<CleanupWithLabel<T>> regularCleanup;
+
+    private final RetryStrategy retryStrategy;
+
+    public static Builder<GloballyCleanableApplicationResource> 
forGloballyCleanableResources(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor cleanupExecutor,
+            RetryStrategy retryStrategy) {
+        return forCleanableResources(
+                mainThreadExecutor,
+                cleanupExecutor,
+                GloballyCleanableApplicationResource::globalCleanupAsync,
+                retryStrategy);
+    }
+
+    @VisibleForTesting
+    static <T> Builder<T> forCleanableResources(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor cleanupExecutor,
+            CleanupFn<T> cleanupFunction,
+            RetryStrategy retryStrategy) {
+        return new Builder<>(mainThreadExecutor, cleanupExecutor, 
cleanupFunction, retryStrategy);
+    }
+
+    @VisibleForTesting
+    @FunctionalInterface
+    interface CleanupFn<T> {
+        CompletableFuture<Void> cleanupAsync(
+                T resource, ApplicationID applicationId, Executor 
cleanupExecutor);
+    }
+
+    /**
+     * {@code Builder} for creating {@code DefaultApplicationResourceCleaner} 
instances.
+     *
+     * @param <T> The functional interface that's being translated into the 
internally used {@link
+     *     CleanupFn}.
+     */
+    public static class Builder<T> {
+
+        private final ComponentMainThreadExecutor mainThreadExecutor;
+        private final Executor cleanupExecutor;
+        private final CleanupFn<T> cleanupFn;
+
+        private final RetryStrategy retryStrategy;
+
+        private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new 
ArrayList<>();
+        private final Collection<CleanupWithLabel<T>> regularCleanup = new 
ArrayList<>();
+
+        private Builder(
+                ComponentMainThreadExecutor mainThreadExecutor,
+                Executor cleanupExecutor,
+                CleanupFn<T> cleanupFn,
+                RetryStrategy retryStrategy) {
+            this.mainThreadExecutor = mainThreadExecutor;
+            this.cleanupExecutor = cleanupExecutor;
+            this.cleanupFn = cleanupFn;
+            this.retryStrategy = retryStrategy;
+        }
+
+        /**
+         * Prioritized cleanups run before their regular counterparts. This 
method enables the
+         * caller to model dependencies between cleanup tasks. The order in 
which cleanable
+         * resources are added matters, i.e. if two cleanable resources are 
added as prioritized
+         * cleanup tasks, the resource being added first will block the 
cleanup of the second
+         * resource. All prioritized cleanup resources will run and finish 
before any resource that
+         * is added using {@link #withRegularCleanup(String, Object)} is 
started.
+         *
+         * @param label The label being used when logging errors in the given 
cleanup.
+         * @param prioritizedCleanup The cleanup callback that is going to be 
prioritized.
+         */
+        public Builder<T> withPrioritizedCleanup(String label, T 
prioritizedCleanup) {
+            this.prioritizedCleanup.add(new 
CleanupWithLabel<>(prioritizedCleanup, label));
+            return this;
+        }
+
+        /**
+         * Regular cleanups are resources for which the cleanup is triggered 
after all prioritized
+         * cleanups succeeded. All added regular cleanups will run 
concurrently to each other.
+         *
+         * @param label The label being used when logging errors in the given 
cleanup.
+         * @param regularCleanup The cleanup callback that is going to run 
after all prioritized
+         *     cleanups are finished.
+         * @see #withPrioritizedCleanup(String, Object)
+         */
+        public Builder<T> withRegularCleanup(String label, T regularCleanup) {
+            this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, 
label));
+            return this;
+        }
+
+        public DefaultApplicationResourceCleaner build() {
+            return new DefaultApplicationResourceCleaner<>(
+                    mainThreadExecutor,
+                    cleanupExecutor,
+                    cleanupFn,
+                    prioritizedCleanup,
+                    regularCleanup,
+                    retryStrategy);
+        }
+    }
+
+    private DefaultApplicationResourceCleaner(
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor cleanupExecutor,
+            CleanupFn<T> cleanupFn,
+            Collection<CleanupWithLabel<T>> prioritizedCleanup,
+            Collection<CleanupWithLabel<T>> regularCleanup,
+            RetryStrategy retryStrategy) {
+        this.mainThreadExecutor = mainThreadExecutor;
+        this.cleanupExecutor = cleanupExecutor;
+        this.cleanupFn = cleanupFn;
+        this.regularCleanup = regularCleanup;
+        this.retryStrategy = retryStrategy;

Review Comment:
   The prioritizedCleanup is not recorded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to