dmvk commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r785912691



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -870,6 +871,7 @@ private void cleanUpRemainingJobData(JobID jobId, boolean 
jobGraphRemoved) {
     private void markJobAsClean(JobID jobId) {
         try {
             jobResultStore.markResultAsClean(jobId);
+            log.debug("Cleanup for job {} finished. Job was marked as clean.", 
jobId);

Review comment:
       nit
   ```suggestion
               log.debug("Cleanup for the job '{}' has finished. Job has been 
marked as clean.", jobId);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -943,15 +945,26 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
         archiveExecutionGraph(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
+            final JobID jobId = executionGraphInfo.getJobId();
             try {
-                jobResultStore.createDirtyResult(
-                        
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
+                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+                    log.warn(
+                            "Job {} is already marked as clean but clean up 
was triggered again.",
+                            jobId);
+                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+                    jobResultStore.createDirtyResult(
+                            new JobResultEntry(
+                                    JobResult.createFrom(
+                                            
executionGraphInfo.getArchivedExecutionGraph())));
+                    log.debug("Job {} marked as dirty. Cleanup will be 
triggered.", jobId);
+                }
             } catch (IOException e) {
-                log.error(
-                        "Could not un-register from high-availability services 
job {}."
-                                + "Other JobManager's may attempt to recover 
it and re-execute it.",
-                        executionGraphInfo.getJobId(),
-                        e);
+                fatalErrorHandler.onFatalError(
+                        new FlinkException(
+                                String.format(
+                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",

Review comment:
       Should we rephrase it a bit? I think if I get this exception as an user, 
I won't known what's going on.
   
   Something along the lines of:
   
   ```
   Failed to create a JobResultStore entry for the job '%s'. This is fatal as 
Flink can not guarantee a proper cleanup lifecycle.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
##########
@@ -20,65 +20,112 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.WithJobResult;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
 
-import java.util.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-/**
- * An implementation of the {@link JobResultStore} which only persists the 
data to an in-memory map.
- */
+/** A thread-safe in-memory implementation of the {@link JobResultStore}. */
 public class EmbeddedJobResultStore implements JobResultStore {
 
-    private final Map<JobID, JobResultEntry> inMemoryMap = new 
ConcurrentHashMap<>();
+    private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedJobResultStore.class);
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

Review comment:
       👍 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -58,12 +62,26 @@ public DispatcherLeaderProcessFactory createFactory(
             throw new FlinkRuntimeException("Could not retrieve the 
JobGraph.", e);
         }
 
+        final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
+        final Collection<JobResult> recoveredDirtyJobResults;
+        try {
+            recoveredDirtyJobResults = jobResultStore.getDirtyResults();

Review comment:
       should we check that there is at most once result?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -868,6 +867,14 @@ private void cleanUpRemainingJobData(JobID jobId, boolean 
jobGraphRemoved) {
         blobServer.cleanupJob(jobId, jobGraphRemoved);
     }
 
+    private void markJobAsClean(JobID jobId) {
+        try {
+            jobResultStore.markResultAsClean(jobId);
+        } catch (IOException e) {
+            log.warn("Could not properly mark job {} result as clean.", jobId, 
e);

Review comment:
       Maybe should we just re-throw a runtime exception here for now? So we 
don't forget to fix it.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
##########
@@ -130,28 +146,56 @@ public void 
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws
         final CompletableFuture<Collection<JobGraph>> recoveredJobGraphsFuture 
=
                 new CompletableFuture<>();
         dispatcherServiceFactory =
-                TestingDispatcherServiceFactory.newBuilder()
-                        .setCreateFunction(
-                                (fencingToken,
-                                        recoveredJobGraphs,
-                                        jobResults,
-                                        jobGraphStore,
-                                        jobResultStore) -> {
-                                    
recoveredJobGraphsFuture.complete(recoveredJobGraphs);
-                                    return 
TestingDispatcherGatewayService.newBuilder().build();
-                                })
-                        .build();
+                createFactoryBasedOnJobGraphs(
+                        recoveredJobGraphs -> {
+                            
recoveredJobGraphsFuture.complete(recoveredJobGraphs);
+                            return 
TestingDispatcherGatewayService.newBuilder().build();
+                        });
 
         try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess =
                 createDispatcherLeaderProcess()) {
             dispatcherLeaderProcess.start();
-            assertThat(
-                    dispatcherLeaderProcess.getState(),
-                    is(SessionDispatcherLeaderProcess.State.RUNNING));
+            assertThat(recoveredJobGraphsFuture)
+                    .succeedsWithin(100, TimeUnit.MILLISECONDS)

Review comment:
       OT: We should rethink using, the `succeedsWithin` with assertj, I've run 
into this multiple times already. We should introduce something along the lines 
of `FlinkAssertions.assertThatSucceeds(future)....` with infinite timeout.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java
##########
@@ -34,14 +37,21 @@
 
     private final JobGraph jobGraph;
 
+    private final Collection<JobResult> recoveredDirtyJobResults;
+    private final JobResultStore jobResultStore;
+
     JobDispatcherLeaderProcess(
             UUID leaderSessionId,
             DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
             JobGraph jobGraph,
+            Collection<JobResult> recoveredDirtyJobResults,

Review comment:
       Would not having a collection here make sense? We already do the same 
thing with the jobGraph (recoveredJobs collection in other implementations)
   ```suggestion
               @Nullable JobResult recoveredDirtyJobResult,
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -943,15 +945,26 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
         archiveExecutionGraph(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
+            final JobID jobId = executionGraphInfo.getJobId();
             try {
-                jobResultStore.createDirtyResult(
-                        
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
+                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+                    log.warn(

Review comment:
       Should this be logged as a warning (at all)? What does this warning mean 
from user perspective? Does the user need to take any action to address that?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
##########
@@ -22,53 +22,88 @@
 import org.apache.flink.runtime.jobmaster.JobResult;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
- * A persistent storage mechanism for the results of successfully and 
unsuccessfully completed jobs.
+ * A storage for the results of globally terminated jobs. These results can 
have the following
+ * states:
+ *
+ * <ul>
+ *   <li>{@code dirty} - indicating that the corresponding job is not properly 
cleaned up, yet.
+ *   <li>{@code clean} - indicating that the cleanup of the corresponding job 
is performed and no
+ *       further actions need to be applied.
+ * </ul>
  */
 public interface JobResultStore {

Review comment:
       ```suggestion
   @Internal
   public interface JobResultStore {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultEntry.java
##########
@@ -22,41 +22,18 @@
 import org.apache.flink.util.Preconditions;
 
 /**
- * An entry in a {@link JobResultStore} that couples a completed {@link 
JobResult} to a state that
- * represents whether the resources of that JobResult have been finalized 
({@link
- * JobResultState#CLEAN}) or have yet to be finalized ({@link 
JobResultState#DIRTY}).
+ * {@code JobResultEntry} is the entity managed by the {@link JobResultStore}. 
It collects
+ * information about a globally terminated job (e.g. {@link JobResult}).
  */
-public class JobResultEntry {
+public class JobResultEntry implements WithJobResult {

Review comment:
       Why do we need an extra `WithJobResult` interface?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
##########
@@ -118,7 +120,12 @@ public ApplicationDispatcherGatewayServiceFactory(
         return DefaultDispatcherGatewayService.from(dispatcher);
     }
 
-    private List<JobID> getRecoveredJobIds(final Collection<JobGraph> 
recoveredJobs) {
-        return 
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toList());
+    private Set<JobID> getRecoveredJobIds(
+            final Collection<JobGraph> recoveredJobs,
+            final Collection<JobResult> recoveredDirtyJobs) {
+        return Stream.concat(
+                        recoveredJobs.stream().map(JobGraph::getJobID),
+                        recoveredDirtyJobs.stream().map(JobResult::getJobId))
+                .collect(Collectors.toSet());

Review comment:
       This part I'm not sure about. Could you document why we need to pass 
dirty jobs into ApplicationDispatcherBootstrap? Why is it not enough to just 
pass them into the dispatcher.
   
   (I'm not saying it's incorrect, just can't reason about it from top of my 
head)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to