This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9d2c1c0d3aa7e72e95f1c75bcf7c77c1fceac22
Author: Thesharing <cypres...@outlook.com>
AuthorDate: Wed Mar 30 11:18:03 2022 +0800

    [FLINK-24491][runtime] Make the job termination wait until the archiving of 
ExecutionGraphInfo finishes
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 70 +++++++++++---------
 .../flink/runtime/dispatcher/MiniDispatcher.java   | 43 +++++++------
 .../dispatcher/DispatcherResourceCleanupTest.java  | 75 +++++++++++++++++++++-
 3 files changed, 140 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c09ba52ca9f..ff73ddaaaa5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -428,10 +428,12 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                         return handleJobManagerRunnerResult(
                                                 jobManagerRunnerResult, 
executionType);
                                     } else {
-                                        return jobManagerRunnerFailed(jobId, 
throwable);
+                                        return 
CompletableFuture.completedFuture(
+                                                jobManagerRunnerFailed(jobId, 
throwable));
                                     }
                                 },
-                                getMainThreadExecutor());
+                                getMainThreadExecutor())
+                        .thenCompose(Function.identity());
 
         final CompletableFuture<Void> jobTerminationFuture =
                 cleanupJobStateFuture
@@ -444,13 +446,14 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);
     }
 
-    private CleanupJobState handleJobManagerRunnerResult(
+    private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
             JobManagerRunnerResult jobManagerRunnerResult, ExecutionType 
executionType) {
         if (jobManagerRunnerResult.isInitializationFailure()) {
             if (executionType == ExecutionType.RECOVERY) {
-                return jobManagerRunnerFailed(
-                        
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
-                        jobManagerRunnerResult.getInitializationFailure());
+                return CompletableFuture.completedFuture(
+                        jobManagerRunnerFailed(
+                                
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                                
jobManagerRunnerResult.getInitializationFailure()));
             } else {
                 return 
jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
             }
@@ -840,7 +843,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         fatalErrorHandler.onFatalError(throwable);
     }
 
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
         final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
@@ -870,14 +874,21 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        writeToExecutionGraphInfoStore(executionGraphInfo);
+
+        if (!terminalJobStatus.isGloballyTerminalState()) {
+            return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
+        }
+
+        // do not create an archive for suspended jobs, as this would 
eventually lead to
+        // multiple archive attempts which we currently do not support
+        CompletableFuture<Acknowledge> archiveToHistoryServerFuture =
+                archiveExecutionGraphToHistoryServer(executionGraphInfo);
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+        return archiveToHistoryServerFuture.thenApply(ignored -> 
CleanupJobState.GLOBAL);
     }
 
-    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
+    private void writeToExecutionGraphInfoStore(ExecutionGraphInfo 
executionGraphInfo) {
         try {
             executionGraphInfoStore.put(executionGraphInfo);
         } catch (IOException e) {
@@ -887,24 +898,25 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
 
-        // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
-        // archive attempts which we currently do not support
-        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to 
the history server.",
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
-        }
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        return historyServerArchivist
+                .archiveExecutionGraph(executionGraphInfo)
+                .handleAsync(
+                        (Acknowledge ignored, Throwable throwable) -> {
+                            if (throwable != null) {
+                                log.info(
+                                        "Could not archive completed job 
{}({}) to the history server.",
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                        throwable);
+                            }
+                            return Acknowledge.get();
+                        },
+                        getMainThreadExecutor());
     }
 
     private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 99bb7167757..85c8d9b2755 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -125,26 +125,33 @@ public class MiniDispatcher extends Dispatcher {
     }
 
     @Override
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
-        final CleanupJobState cleanupHAState = 
super.jobReachedTerminalState(executionGraphInfo);
-
-        JobStatus jobStatus =
-                Objects.requireNonNull(
-                        archivedExecutionGraph.getState(), "JobStatus should 
not be null here.");
-        if (jobStatus.isGloballyTerminalState()
-                && (jobCancelled || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED)) {
-            // shut down if job is cancelled or we don't have to wait for the 
execution result
-            // retrieval
-            log.info(
-                    "Shutting down cluster with state {}, jobCancelled: {}, 
executionMode: {}",
-                    jobStatus,
-                    jobCancelled,
-                    executionMode);
-            
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
-        }
+        final CompletableFuture<CleanupJobState> cleanupHAState =
+                super.jobReachedTerminalState(executionGraphInfo);
+
+        return cleanupHAState.thenApply(
+                cleanupJobState -> {
+                    JobStatus jobStatus =
+                            Objects.requireNonNull(
+                                    archivedExecutionGraph.getState(),
+                                    "JobStatus should not be null here.");
+                    if (jobStatus.isGloballyTerminalState()
+                            && (jobCancelled
+                                    || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED)) {
+                        // shut down if job is cancelled or we don't have to 
wait for the execution
+                        // result retrieval
+                        log.info(
+                                "Shutting down cluster with state {}, 
jobCancelled: {}, executionMode: {}",
+                                jobStatus,
+                                jobCancelled,
+                                executionMode);
+                        
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
+                    }
 
-        return cleanupHAState;
+                    return cleanupJobState;
+                });
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index bef8ec2df4e..81d8d135185 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -84,9 +84,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -132,6 +135,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
     private CompletableFuture<JobID> cleanupJobFuture;
     private CompletableFuture<JobID> cleanupJobHADataFuture;
     private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+    private HistoryServerArchivist historyServerArchivist = 
VoidHistoryServerArchivist.INSTANCE;
 
     @BeforeClass
     public static void setupClass() {
@@ -213,7 +217,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                                 heartbeatServices,
                                 archivedExecutionGraphStore,
                                 
testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                                VoidHistoryServerArchivist.INSTANCE,
+                                historyServerArchivist,
                                 null,
                                 
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                                 jobGraphWriter,
@@ -637,6 +641,75 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
         assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
     }
 
+    @Test
+    public void testArchivingFinishedJobToHistoryServer() throws Exception {
+
+        final OneShotLatch archivingLatch = new OneShotLatch();
+        final CompletableFuture<Acknowledge> archiveFuture = new 
CompletableFuture<>();
+
+        historyServerArchivist =
+                executionGraphInfo -> {
+                    archivingLatch.trigger();
+                    return archiveFuture;
+                };
+        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+                startDispatcherAndSubmitJob(0);
+
+        finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
+
+        archivingLatch.await();
+
+        // The cleanup is not triggered before archiving is done
+        assertThatNoCleanupWasTriggered();
+
+        archiveFuture.complete(Acknowledge.get());
+
+        assertGlobalCleanupTriggered(jobId);
+    }
+
+    @Test
+    public void testNotArchivingSuspendedJobToHistoryServer() throws Exception 
{
+
+        final AtomicBoolean isArchived = new AtomicBoolean(false);
+
+        historyServerArchivist =
+                executionGraphInfo -> {
+                    isArchived.set(true);
+                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                };
+        final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
+                startDispatcherAndSubmitJob(0);
+
+        suspendJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
+
+        assertLocalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, Time.hours(1)).join();
+
+        assertFalse(
+                "Archiving should not be triggered for a non-globally terminal 
job.",
+                isArchived.get());
+    }
+
+    private void assertThatNoCleanupWasTriggered() {
+        assertFalse(cleanupJobFuture.isDone());
+        assertFalse(deleteAllHABlobsFuture.isDone());
+        assertFalse(cleanupJobHADataFuture.isDone());
+    }
+
+    private void assertLocalCleanupTriggered(JobID jobId)
+            throws ExecutionException, InterruptedException {
+        assertEquals(cleanupJobFuture.get(), jobId);
+        assertFalse(deleteAllHABlobsFuture.isDone());
+        assertFalse(cleanupJobHADataFuture.isDone());
+    }
+
+    private void assertGlobalCleanupTriggered(JobID jobId)
+            throws ExecutionException, InterruptedException {
+        assertEquals(cleanupJobFuture.get(), jobId);
+        assertEquals(deleteAllHABlobsFuture.get(), jobId);
+        assertEquals(cleanupJobHADataFuture.get(), jobId);
+    }
+
     private static final class TestingBlobServer extends BlobServer {
 
         private final CompletableFuture<JobID> cleanupJobFuture;

Reply via email to