dmvk commented on a change in pull request #18189: URL: https://github.com/apache/flink/pull/18189#discussion_r785912691
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -870,6 +871,7 @@ private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { private void markJobAsClean(JobID jobId) { try { jobResultStore.markResultAsClean(jobId); + log.debug("Cleanup for job {} finished. Job was marked as clean.", jobId); Review comment: nit ```suggestion log.debug("Cleanup for the job '{}' has finished. Job has been marked as clean.", jobId); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -943,15 +945,26 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr archiveExecutionGraph(executionGraphInfo); if (terminalJobStatus.isGloballyTerminalState()) { + final JobID jobId = executionGraphInfo.getJobId(); try { - jobResultStore.createDirtyResult( - JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); + if (jobResultStore.hasCleanJobResultEntry(jobId)) { + log.warn( + "Job {} is already marked as clean but clean up was triggered again.", + jobId); + } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) { + jobResultStore.createDirtyResult( + new JobResultEntry( + JobResult.createFrom( + executionGraphInfo.getArchivedExecutionGraph()))); + log.debug("Job {} marked as dirty. Cleanup will be triggered.", jobId); + } } catch (IOException e) { - log.error( - "Could not un-register from high-availability services job {}." - + "Other JobManager's may attempt to recover it and re-execute it.", - executionGraphInfo.getJobId(), - e); + fatalErrorHandler.onFatalError( + new FlinkException( + String.format( + "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.", Review comment: Should we rephrase it a bit? I think if I get this exception as an user, I won't known what's going on. Something along the lines of: ``` Failed to create a JobResultStore entry for the job '%s'. This is fatal as Flink can not guarantee a proper cleanup lifecycle. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java ########## @@ -20,65 +20,112 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.highavailability.JobResultEntry; import org.apache.flink.runtime.highavailability.JobResultStore; +import org.apache.flink.runtime.highavailability.WithJobResult; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.Preconditions; -import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import java.util.stream.Collectors; -/** - * An implementation of the {@link JobResultStore} which only persists the data to an in-memory map. - */ +/** A thread-safe in-memory implementation of the {@link JobResultStore}. */ public class EmbeddedJobResultStore implements JobResultStore { - private final Map<JobID, JobResultEntry> inMemoryMap = new ConcurrentHashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedJobResultStore.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); Review comment: 👍 ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java ########## @@ -58,12 +62,26 @@ public DispatcherLeaderProcessFactory createFactory( throw new FlinkRuntimeException("Could not retrieve the JobGraph.", e); } + final JobResultStore jobResultStore = jobPersistenceComponentFactory.createJobResultStore(); + final Collection<JobResult> recoveredDirtyJobResults; + try { + recoveredDirtyJobResults = jobResultStore.getDirtyResults(); Review comment: should we check that there is at most once result? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -868,6 +867,14 @@ private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { blobServer.cleanupJob(jobId, jobGraphRemoved); } + private void markJobAsClean(JobID jobId) { + try { + jobResultStore.markResultAsClean(jobId); + } catch (IOException e) { + log.warn("Could not properly mark job {} result as clean.", jobId, e); Review comment: Maybe should we just re-throw a runtime exception here for now? So we don't forget to fix it. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java ########## @@ -130,28 +146,56 @@ public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws final CompletableFuture<Collection<JobGraph>> recoveredJobGraphsFuture = new CompletableFuture<>(); dispatcherServiceFactory = - TestingDispatcherServiceFactory.newBuilder() - .setCreateFunction( - (fencingToken, - recoveredJobGraphs, - jobResults, - jobGraphStore, - jobResultStore) -> { - recoveredJobGraphsFuture.complete(recoveredJobGraphs); - return TestingDispatcherGatewayService.newBuilder().build(); - }) - .build(); + createFactoryBasedOnJobGraphs( + recoveredJobGraphs -> { + recoveredJobGraphsFuture.complete(recoveredJobGraphs); + return TestingDispatcherGatewayService.newBuilder().build(); + }); try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess()) { dispatcherLeaderProcess.start(); - assertThat( - dispatcherLeaderProcess.getState(), - is(SessionDispatcherLeaderProcess.State.RUNNING)); + assertThat(recoveredJobGraphsFuture) + .succeedsWithin(100, TimeUnit.MILLISECONDS) Review comment: OT: We should rethink using, the `succeedsWithin` with assertj, I've run into this multiple times already. We should introduce something along the lines of `FlinkAssertions.assertThatSucceeds(future)....` with infinite timeout. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java ########## @@ -34,14 +37,21 @@ private final JobGraph jobGraph; + private final Collection<JobResult> recoveredDirtyJobResults; + private final JobResultStore jobResultStore; + JobDispatcherLeaderProcess( UUID leaderSessionId, DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, JobGraph jobGraph, + Collection<JobResult> recoveredDirtyJobResults, Review comment: Would not having a collection here make sense? We already do the same thing with the jobGraph (recoveredJobs collection in other implementations) ```suggestion @Nullable JobResult recoveredDirtyJobResult, ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -943,15 +945,26 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr archiveExecutionGraph(executionGraphInfo); if (terminalJobStatus.isGloballyTerminalState()) { + final JobID jobId = executionGraphInfo.getJobId(); try { - jobResultStore.createDirtyResult( - JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); + if (jobResultStore.hasCleanJobResultEntry(jobId)) { + log.warn( Review comment: Should this be logged as a warning (at all)? What does this warning mean from user perspective? Does the user need to take any action to address that? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java ########## @@ -22,53 +22,88 @@ import org.apache.flink.runtime.jobmaster.JobResult; import java.io.IOException; -import java.util.Collection; import java.util.NoSuchElementException; +import java.util.Set; /** - * A persistent storage mechanism for the results of successfully and unsuccessfully completed jobs. + * A storage for the results of globally terminated jobs. These results can have the following + * states: + * + * <ul> + * <li>{@code dirty} - indicating that the corresponding job is not properly cleaned up, yet. + * <li>{@code clean} - indicating that the cleanup of the corresponding job is performed and no + * further actions need to be applied. + * </ul> */ public interface JobResultStore { Review comment: ```suggestion @Internal public interface JobResultStore { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultEntry.java ########## @@ -22,41 +22,18 @@ import org.apache.flink.util.Preconditions; /** - * An entry in a {@link JobResultStore} that couples a completed {@link JobResult} to a state that - * represents whether the resources of that JobResult have been finalized ({@link - * JobResultState#CLEAN}) or have yet to be finalized ({@link JobResultState#DIRTY}). + * {@code JobResultEntry} is the entity managed by the {@link JobResultStore}. It collects + * information about a globally terminated job (e.g. {@link JobResult}). */ -public class JobResultEntry { +public class JobResultEntry implements WithJobResult { Review comment: Why do we need an extra `WithJobResult` interface? ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java ########## @@ -118,7 +120,12 @@ public ApplicationDispatcherGatewayServiceFactory( return DefaultDispatcherGatewayService.from(dispatcher); } - private List<JobID> getRecoveredJobIds(final Collection<JobGraph> recoveredJobs) { - return recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toList()); + private Set<JobID> getRecoveredJobIds( + final Collection<JobGraph> recoveredJobs, + final Collection<JobResult> recoveredDirtyJobs) { + return Stream.concat( + recoveredJobs.stream().map(JobGraph::getJobID), + recoveredDirtyJobs.stream().map(JobResult::getJobId)) + .collect(Collectors.toSet()); Review comment: This part I'm not sure about. Could you document why we need to pass dirty jobs into ApplicationDispatcherBootstrap? Why is it not enough to just pass them into the dispatcher. (I'm not saying it's incorrect, just can't reason about it from top of my head) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org