This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9d2c1c0d3aa7e72e95f1c75bcf7c77c1fceac22 Author: Thesharing <cypres...@outlook.com> AuthorDate: Wed Mar 30 11:18:03 2022 +0800 [FLINK-24491][runtime] Make the job termination wait until the archiving of ExecutionGraphInfo finishes --- .../flink/runtime/dispatcher/Dispatcher.java | 70 +++++++++++--------- .../flink/runtime/dispatcher/MiniDispatcher.java | 43 +++++++------ .../dispatcher/DispatcherResourceCleanupTest.java | 75 +++++++++++++++++++++- 3 files changed, 140 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index c09ba52ca9f..ff73ddaaaa5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -428,10 +428,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher return handleJobManagerRunnerResult( jobManagerRunnerResult, executionType); } else { - return jobManagerRunnerFailed(jobId, throwable); + return CompletableFuture.completedFuture( + jobManagerRunnerFailed(jobId, throwable)); } }, - getMainThreadExecutor()); + getMainThreadExecutor()) + .thenCompose(Function.identity()); final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture @@ -444,13 +446,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture); } - private CleanupJobState handleJobManagerRunnerResult( + private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult( JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) { if (jobManagerRunnerResult.isInitializationFailure()) { if (executionType == ExecutionType.RECOVERY) { - return jobManagerRunnerFailed( - jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), - jobManagerRunnerResult.getInitializationFailure()); + return CompletableFuture.completedFuture( + jobManagerRunnerFailed( + jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), + jobManagerRunnerResult.getInitializationFailure())); } else { return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); } @@ -840,7 +843,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher fatalErrorHandler.onFatalError(throwable); } - protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) { + protected CompletableFuture<CleanupJobState> jobReachedTerminalState( + ExecutionGraphInfo executionGraphInfo) { final ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph(); final JobStatus terminalJobStatus = archivedExecutionGraph.getState(); @@ -870,14 +874,21 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher terminalJobStatus); } - archiveExecutionGraph(executionGraphInfo); + writeToExecutionGraphInfoStore(executionGraphInfo); + + if (!terminalJobStatus.isGloballyTerminalState()) { + return CompletableFuture.completedFuture(CleanupJobState.LOCAL); + } + + // do not create an archive for suspended jobs, as this would eventually lead to + // multiple archive attempts which we currently do not support + CompletableFuture<Acknowledge> archiveToHistoryServerFuture = + archiveExecutionGraphToHistoryServer(executionGraphInfo); - return terminalJobStatus.isGloballyTerminalState() - ? CleanupJobState.GLOBAL - : CleanupJobState.LOCAL; + return archiveToHistoryServerFuture.thenApply(ignored -> CleanupJobState.GLOBAL); } - private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) { + private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) { try { executionGraphInfoStore.put(executionGraphInfo); } catch (IOException e) { @@ -887,24 +898,25 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher executionGraphInfo.getArchivedExecutionGraph().getJobID(), e); } + } - // do not create an archive for suspended jobs, as this would eventually lead to multiple - // archive attempts which we currently do not support - if (executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState()) { - final CompletableFuture<Acknowledge> executionGraphFuture = - historyServerArchivist.archiveExecutionGraph(executionGraphInfo); - - executionGraphFuture.whenComplete( - (Acknowledge ignored, Throwable throwable) -> { - if (throwable != null) { - log.info( - "Could not archive completed job {}({}) to the history server.", - executionGraphInfo.getArchivedExecutionGraph().getJobName(), - executionGraphInfo.getArchivedExecutionGraph().getJobID(), - throwable); - } - }); - } + private CompletableFuture<Acknowledge> archiveExecutionGraphToHistoryServer( + ExecutionGraphInfo executionGraphInfo) { + + return historyServerArchivist + .archiveExecutionGraph(executionGraphInfo) + .handleAsync( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + log.info( + "Could not archive completed job {}({}) to the history server.", + executionGraphInfo.getArchivedExecutionGraph().getJobName(), + executionGraphInfo.getArchivedExecutionGraph().getJobID(), + throwable); + } + return Acknowledge.get(); + }, + getMainThreadExecutor()); } private void jobMasterFailed(JobID jobId, Throwable cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 99bb7167757..85c8d9b2755 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -125,26 +125,33 @@ public class MiniDispatcher extends Dispatcher { } @Override - protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) { + protected CompletableFuture<CleanupJobState> jobReachedTerminalState( + ExecutionGraphInfo executionGraphInfo) { final ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph(); - final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo); - - JobStatus jobStatus = - Objects.requireNonNull( - archivedExecutionGraph.getState(), "JobStatus should not be null here."); - if (jobStatus.isGloballyTerminalState() - && (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) { - // shut down if job is cancelled or we don't have to wait for the execution result - // retrieval - log.info( - "Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", - jobStatus, - jobCancelled, - executionMode); - shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); - } + final CompletableFuture<CleanupJobState> cleanupHAState = + super.jobReachedTerminalState(executionGraphInfo); + + return cleanupHAState.thenApply( + cleanupJobState -> { + JobStatus jobStatus = + Objects.requireNonNull( + archivedExecutionGraph.getState(), + "JobStatus should not be null here."); + if (jobStatus.isGloballyTerminalState() + && (jobCancelled + || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) { + // shut down if job is cancelled or we don't have to wait for the execution + // result retrieval + log.info( + "Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", + jobStatus, + jobCancelled, + executionMode); + shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); + } - return cleanupHAState; + return cleanupJobState; + }); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index bef8ec2df4e..81d8d135185 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -84,9 +84,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -132,6 +135,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture<JobID> cleanupJobFuture; private CompletableFuture<JobID> cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE; @BeforeClass public static void setupClass() { @@ -213,7 +217,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { heartbeatServices, archivedExecutionGraphStore, testingFatalErrorHandlerResource.getFatalErrorHandler(), - VoidHistoryServerArchivist.INSTANCE, + historyServerArchivist, null, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), jobGraphWriter, @@ -637,6 +641,75 @@ public class DispatcherResourceCleanupTest extends TestLogger { assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); } + @Test + public void testArchivingFinishedJobToHistoryServer() throws Exception { + + final OneShotLatch archivingLatch = new OneShotLatch(); + final CompletableFuture<Acknowledge> archiveFuture = new CompletableFuture<>(); + + historyServerArchivist = + executionGraphInfo -> { + archivingLatch.trigger(); + return archiveFuture; + }; + final TestingJobManagerRunnerFactory jobManagerRunnerFactory = + startDispatcherAndSubmitJob(0); + + finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); + + archivingLatch.await(); + + // The cleanup is not triggered before archiving is done + assertThatNoCleanupWasTriggered(); + + archiveFuture.complete(Acknowledge.get()); + + assertGlobalCleanupTriggered(jobId); + } + + @Test + public void testNotArchivingSuspendedJobToHistoryServer() throws Exception { + + final AtomicBoolean isArchived = new AtomicBoolean(false); + + historyServerArchivist = + executionGraphInfo -> { + isArchived.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }; + final TestingJobManagerRunnerFactory jobManagerRunnerFactory = + startDispatcherAndSubmitJob(0); + + suspendJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); + + assertLocalCleanupTriggered(jobId); + dispatcher.getJobTerminationFuture(jobId, Time.hours(1)).join(); + + assertFalse( + "Archiving should not be triggered for a non-globally terminal job.", + isArchived.get()); + } + + private void assertThatNoCleanupWasTriggered() { + assertFalse(cleanupJobFuture.isDone()); + assertFalse(deleteAllHABlobsFuture.isDone()); + assertFalse(cleanupJobHADataFuture.isDone()); + } + + private void assertLocalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException { + assertEquals(cleanupJobFuture.get(), jobId); + assertFalse(deleteAllHABlobsFuture.isDone()); + assertFalse(cleanupJobHADataFuture.isDone()); + } + + private void assertGlobalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException { + assertEquals(cleanupJobFuture.get(), jobId); + assertEquals(deleteAllHABlobsFuture.get(), jobId); + assertEquals(cleanupJobHADataFuture.get(), jobId); + } + private static final class TestingBlobServer extends BlobServer { private final CompletableFuture<JobID> cleanupJobFuture;