zhuzhurk commented on code in PR #27741: URL: https://github.com/apache/flink/pull/27741#discussion_r2951272433
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java: ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.persistence.ResourceVersion; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Default implementation for {@link ApplicationStore}. Combined with different {@link + * StateHandleStore}, we could persist the applications to various distributed storage. + */ +public class DefaultApplicationStore<R extends ResourceVersion<R>> implements ApplicationStore { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultApplicationStore.class); + + private final Object lock = new Object(); + + /** The set of IDs of all added applications. */ + @GuardedBy("lock") + private final Set<ApplicationID> addedApplications = new HashSet<>(); + + /** Submitted applications handle store. */ + private final StateHandleStore<ApplicationStoreEntry, R> applicationStateHandleStore; + + private final ApplicationStoreUtil applicationStoreUtil; + + /** Flag indicating whether this instance is running. */ + @GuardedBy("lock") + private volatile boolean running; + + public DefaultApplicationStore( + StateHandleStore<ApplicationStoreEntry, R> stateHandleStore, + ApplicationStoreUtil applicationStoreUtil) { + this.applicationStateHandleStore = checkNotNull(stateHandleStore); + this.applicationStoreUtil = checkNotNull(applicationStoreUtil); + + this.running = false; + } + + @Override + public void start() throws Exception { + synchronized (lock) { + if (!running) { + running = true; + } + } + } + + @Override + public void stop() throws Exception { + synchronized (lock) { + if (running) { + running = false; + LOG.info("Stopping DefaultApplicationStore."); + Exception exception = null; + + try { + applicationStateHandleStore.releaseAll(); + } catch (Exception e) { + exception = e; + } + + if (exception != null) { + throw new FlinkException( + "Could not properly stop the DefaultApplicationStore.", exception); + } + } + } + } + + @Override + public Optional<ApplicationStoreEntry> recoverApplication(ApplicationID applicationId) + throws Exception { + checkNotNull(applicationId, "Application ID"); + + LOG.debug("Recovering application {} from {}.", applicationId, applicationStateHandleStore); + + final String name = applicationStoreUtil.applicationIdToName(applicationId); + + synchronized (lock) { + verifyIsRunning(); + + boolean success = false; + + RetrievableStateHandle<ApplicationStoreEntry> applicationRetrievableStateHandle; + + try { + try { + applicationRetrievableStateHandle = + applicationStateHandleStore.getAndLock(name); + } catch (StateHandleStore.NotExistException ignored) { + success = true; + return Optional.empty(); + } catch (Exception e) { + throw new FlinkException( + "Could not retrieve the submitted application state handle " + + "for " + + name + + " from the submitted application store.", + e); + } + + ApplicationStoreEntry application; + try { + application = applicationRetrievableStateHandle.retrieveState(); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException( + "Could not retrieve submitted application from state handle under " + + name + + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", + cnfe); + } catch (IOException ioe) { + throw new FlinkException( + "Could not retrieve submitted application from state handle under " + + name + + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " + + "store.", + ioe); + } + + addedApplications.add(applicationId); + + LOG.info("Recovered {} ({}).", application.getName(), applicationId); + + success = true; + return Optional.of(application); + } finally { + if (!success) { + applicationStateHandleStore.release(name); + } + } + } + } + + @Override + public void putApplication(ApplicationStoreEntry application) throws Exception { + checkNotNull(application, "Application"); + + final ApplicationID applicationID = application.getApplicationId(); Review Comment: applicationID -> applicationId ########## flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationEntry.java: ########## @@ -122,7 +122,8 @@ public AbstractApplication getApplication( handleFatalError, enforceSingleJobExecution, submitFailedJobOnApplicationError, - shutDownOnFinish); + shutDownOnFinish, + userJarBlobKey); Review Comment: Note that this is not a hotfix but a fixup for this PR. The change should be squashed into another commit before the PR get merged. ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java: ########## @@ -251,28 +446,40 @@ private Optional<CompletableFuture<Void>> submitAddedJobIfRunning(ExecutionPlan private CompletableFuture<Void> submitAddedJob(ExecutionPlan executionPlan) { final DispatcherGateway dispatcherGateway = getDispatcherGatewayInternal(); + final JobID jobId = executionPlan.getJobID(); + final Duration timeout = + executionPlan.getJobConfiguration().get(RpcOptions.ASK_TIMEOUT_DURATION); + // Skip job submission if its associated application exists, as the application will handle + // the job recovery + ApplicationID applicationId = executionPlan.getApplicationId().orElse(null); return dispatcherGateway - .submitJob(executionPlan, RpcUtils.INF_TIMEOUT) - .thenApply(FunctionUtils.nullFn()) - .exceptionally(this::filterOutDuplicateJobSubmissionException); - } - - private Void filterOutDuplicateJobSubmissionException(Throwable throwable) { - final Throwable strippedException = ExceptionUtils.stripCompletionException(throwable); - if (strippedException instanceof DuplicateJobSubmissionException) { - final DuplicateJobSubmissionException duplicateJobSubmissionException = - (DuplicateJobSubmissionException) strippedException; - - log.debug( - "Ignore recovered job {} because the job is currently being executed.", - duplicateJobSubmissionException.getJobID(), - duplicateJobSubmissionException); - - return null; - } else { - throw new CompletionException(throwable); - } + .requestApplication(applicationId, timeout) Review Comment: Should it avoid calling `requestApplication` if the `applicationId` is null? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java: ########## @@ -401,6 +402,28 @@ public static ArchivedExecutionGraph createFrom( executionGraph.getApplicationId().orElse(null)); } + /** + * Create a {@link ArchivedExecutionGraph} from the given {@link JobResult}. + * + * @param jobResult to create the ArchivedExecutionGraph from + * @param initializationTimestamp optionally overrides the initialization timestamp if the + * jobResult does not have a valid one + * @return ArchivedExecutionGraph created from the given jobResult + */ + public static ArchivedExecutionGraph createFrom( Review Comment: It's better to name it as `createSparseArchivedExecutionGraph` as the result is not a standard `ArchivedExecutionGraph`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
