This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1538eb23930fedc5e7e0a1334a15e7ec86c79a06
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 15:35:41 2021 +0100

    [FLINK-20846] Move checkpoint service shut down out of CheckpointCoordinator
    
    By moving the shut down of checkpoint services out of the 
CheckpointCoordinator,
    it is now possible to reuse these services across different 
CheckpointCoordinators.
    
    This closes #14553.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   8 --
 .../runtime/executiongraph/ExecutionGraph.java     |   5 +-
 .../executiongraph/ExecutionGraphBuilder.java      |   5 +-
 .../flink/runtime/scheduler/SchedulerBase.java     | 118 ++++++++++++++++-----
 .../checkpoint/CheckpointCoordinatorTest.java      |   6 +-
 .../TestingExecutionGraphBuilder.java              |   2 +
 6 files changed, 107 insertions(+), 37 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0da6f00..558f16a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -423,14 +423,6 @@ public class CheckpointCoordinator {
                                 
CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                 // clear queued requests and in-flight checkpoints
                 abortPendingAndQueuedCheckpoints(reason);
-
-                completedCheckpointStore.shutdown(
-                        jobStatus,
-                        checkpointsCleaner,
-                        () -> {
-                            // don't schedule anything on shutdown
-                        });
-                checkpointIdCounter.shutdown(jobStatus);
             }
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b93800..13a4c03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -416,7 +416,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
             CheckpointIDCounter checkpointIDCounter,
             CompletedCheckpointStore checkpointStore,
             StateBackend checkpointStateBackend,
-            CheckpointStatsTracker statsTracker) {
+            CheckpointStatsTracker statsTracker,
+            CheckpointsCleaner checkpointsCleaner) {
 
         checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
         checkState(checkpointCoordinator == null, "checkpointing already 
enabled");
@@ -470,7 +471,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                         checkpointStore,
                         checkpointStateBackend,
                         ioExecutor,
-                        new CheckpointsCleaner(),
+                        checkpointsCleaner,
                         new 
ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
                         failureManager);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index d63b045..86c26e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
@@ -80,6 +81,7 @@ public class ExecutionGraphBuilder {
             SlotProvider slotProvider,
             ClassLoader classLoader,
             CompletedCheckpointStore completedCheckpointStore,
+            CheckpointsCleaner checkpointsCleaner,
             CheckpointIDCounter checkpointIdCounter,
             Time rpcTimeout,
             MetricGroup metrics,
@@ -289,7 +291,8 @@ public class ExecutionGraphBuilder {
                     checkpointIdCounter,
                     completedCheckpointStore,
                     rootBackend,
-                    checkpointStatsTracker);
+                    checkpointStatsTracker,
+                    checkpointsCleaner);
         }
 
         // create all the metrics for the Execution Graph
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 15ac47a..cf93cdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import 
org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
@@ -157,6 +158,12 @@ public abstract class SchedulerBase implements SchedulerNG 
{
 
     private final CheckpointRecoveryFactory checkpointRecoveryFactory;
 
+    private final CompletedCheckpointStore completedCheckpointStore;
+
+    private final CheckpointsCleaner checkpointsCleaner;
+
+    private final CheckpointIDCounter checkpointIdCounter;
+
     private final Time rpcTimeout;
 
     private final BlobWriter blobWriter;
@@ -211,14 +218,23 @@ public abstract class SchedulerBase implements 
SchedulerNG {
         this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
         this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
 
+        this.checkpointsCleaner = new CheckpointsCleaner();
+        this.completedCheckpointStore = createCompletedCheckpointStore();
+        this.checkpointIdCounter = createCheckpointIdCounter();
+
         this.executionGraph =
                 createAndRestoreExecutionGraph(
                         jobManagerJobMetricGroup,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
                         checkNotNull(shuffleMaster),
                         checkNotNull(partitionTracker),
                         checkNotNull(executionDeploymentTracker),
                         initializationTimestamp);
 
+        
registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph);
+
         this.schedulingTopology = executionGraph.getSchedulingTopology();
 
         stateLocationRetriever =
@@ -230,8 +246,78 @@ public abstract class SchedulerBase implements SchedulerNG 
{
         this.coordinatorMap = createCoordinatorMap();
     }
 
+    private void registerShutDownCheckpointServicesOnExecutionGraphTermination(
+            ExecutionGraph executionGraph) {
+        FutureUtils.assertNoException(
+                
executionGraph.getTerminationFuture().thenAccept(this::shutDownCheckpointServices));
+    }
+
+    private void shutDownCheckpointServices(JobStatus jobStatus) {
+        Exception exception = null;
+
+        try {
+            completedCheckpointStore.shutdown(
+                    jobStatus,
+                    checkpointsCleaner,
+                    () -> {
+                        // don't schedule anything on shutdown
+                    });
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        try {
+            checkpointIdCounter.shutdown(jobStatus);
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            log.error("Error while shutting down checkpoint services.", 
exception);
+        }
+    }
+
+    private CompletedCheckpointStore createCompletedCheckpointStore() throws 
JobExecutionException {
+        final JobID jobId = jobGraph.getJobID();
+        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
+            try {
+                return ExecutionGraphBuilder.createCompletedCheckpointStore(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        checkpointRecoveryFactory,
+                        log,
+                        jobId);
+            } catch (Exception e) {
+                throw new JobExecutionException(
+                        jobId,
+                        "Failed to initialize high-availability completed 
checkpoint store",
+                        e);
+            }
+        } else {
+            return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
+        }
+    }
+
+    private CheckpointIDCounter createCheckpointIdCounter() throws 
JobExecutionException {
+        final JobID jobId = jobGraph.getJobID();
+        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
+            try {
+                return ExecutionGraphBuilder.createCheckpointIdCounter(
+                        checkpointRecoveryFactory, jobId);
+            } catch (Exception e) {
+                throw new JobExecutionException(
+                        jobId, "Failed to initialize high-availability 
checkpoint id counter", e);
+            }
+        } else {
+            return DeactivatedCheckpointIDCounter.INSTANCE;
+        }
+    }
+
     private ExecutionGraph createAndRestoreExecutionGraph(
             JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+            CompletedCheckpointStore completedCheckpointStore,
+            CheckpointsCleaner checkpointsCleaner,
+            CheckpointIDCounter checkpointIdCounter,
             ShuffleMaster<?> shuffleMaster,
             JobMasterPartitionTracker partitionTracker,
             ExecutionDeploymentTracker executionDeploymentTracker,
@@ -241,6 +327,9 @@ public abstract class SchedulerBase implements SchedulerNG {
         ExecutionGraph newExecutionGraph =
                 createExecutionGraph(
                         currentJobManagerJobMetricGroup,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
                         shuffleMaster,
                         partitionTracker,
                         executionDeploymentTracker,
@@ -265,6 +354,9 @@ public abstract class SchedulerBase implements SchedulerNG {
 
     private ExecutionGraph createExecutionGraph(
             JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+            CompletedCheckpointStore completedCheckpointStore,
+            CheckpointsCleaner checkpointsCleaner,
+            CheckpointIDCounter checkpointIdCounter,
             ShuffleMaster<?> shuffleMaster,
             final JobMasterPartitionTracker partitionTracker,
             ExecutionDeploymentTracker executionDeploymentTracker,
@@ -280,31 +372,6 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                     }
                 };
 
-        final JobID jobId = jobGraph.getJobID();
-        final CheckpointIDCounter checkpointIdCounter;
-        final CompletedCheckpointStore completedCheckpointStore;
-
-        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
-            try {
-                checkpointIdCounter =
-                        ExecutionGraphBuilder.createCheckpointIdCounter(
-                                checkpointRecoveryFactory, jobId);
-                completedCheckpointStore =
-                        ExecutionGraphBuilder.createCompletedCheckpointStore(
-                                jobMasterConfiguration,
-                                userCodeLoader,
-                                checkpointRecoveryFactory,
-                                log,
-                                jobId);
-            } catch (Exception e) {
-                throw new JobExecutionException(
-                        jobId, "Failed to initialize high-availability 
checkpoint handler", e);
-            }
-        } else {
-            checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE;
-            completedCheckpointStore = 
DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
-        }
-
         return ExecutionGraphBuilder.buildGraph(
                 jobGraph,
                 jobMasterConfiguration,
@@ -313,6 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                 slotProvider,
                 userCodeLoader,
                 completedCheckpointStore,
+                checkpointsCleaner,
                 checkpointIdCounter,
                 rpcTimeout,
                 currentJobManagerJobMetricGroup,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index dd26126..c7d66fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -983,6 +983,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
             ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
             // set up the coordinator and validate the initial state
+            final StandaloneCompletedCheckpointStore completedCheckpointStore =
+                    new StandaloneCompletedCheckpointStore(10);
             CheckpointCoordinator checkpointCoordinator =
                     new CheckpointCoordinatorBuilder()
                             .setJobId(jobId)
@@ -995,7 +997,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             .setTasksToWaitFor(
                                     new ExecutionVertex[] {ackVertex1, 
ackVertex2, ackVertex3})
                             .setTasksToCommitTo(new ExecutionVertex[] 
{commitVertex})
-                            .setCompletedCheckpointStore(new 
StandaloneCompletedCheckpointStore(10))
+                            
.setCompletedCheckpointStore(completedCheckpointStore)
                             .setTimer(manuallyTriggeredScheduledExecutor)
                             .build();
 
@@ -1168,6 +1170,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             verify(subtaskState13, times(1)).discardState();
 
             checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            completedCheckpointStore.shutdown(
+                    JobStatus.FINISHED, new CheckpointsCleaner(), () -> {});
 
             // validate that the states in the second checkpoint have been 
discarded
             verify(subtaskState21, times(1)).discardState();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
index 0add2f6..59360ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
@@ -175,6 +176,7 @@ public class TestingExecutionGraphBuilder {
                 slotProvider,
                 userClassLoader,
                 completedCheckpointStore,
+                new CheckpointsCleaner(),
                 checkpointIdCounter,
                 rpcTimeout,
                 metricGroup,

Reply via email to