This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa49e8458390e8bdb0c179b4c8d8e0e9b55e8596
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 14:26:15 2021 +0100

    [FLINK-20846] Move CheckpointIDCounter creation out of ExecutionGraphBuilder
---
 .../checkpoint/DeactivatedCheckpointIDCounter.java | 54 ++++++++++++++++++++++
 .../executiongraph/ExecutionGraphBuilder.java      |  9 ++--
 .../flink/runtime/scheduler/SchedulerBase.java     | 19 ++++++++
 .../CheckpointSettingsSerializableTest.java        |  1 +
 .../ExecutionGraphDeploymentTest.java              |  2 +
 .../ExecutionGraphRescalingTest.java               |  5 ++
 .../ExecutionVertexLocalityTest.java               |  2 +
 .../TestingExecutionGraphBuilder.java              | 10 ++++
 8 files changed, 98 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
new file mode 100644
index 0000000..ca86159
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+/**
+ * This class represents a {@link CheckpointIDCounter} if checkpointing is 
deactivated.
+ * Consequently, no component should use methods of this class other than 
{@link #start()} and
+ * {@link #shutdown}.
+ */
+public enum DeactivatedCheckpointIDCounter implements CheckpointIDCounter {
+    INSTANCE;
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void shutdown(JobStatus jobStatus) throws Exception {}
+
+    @Override
+    public long getAndIncrement() throws Exception {
+        throw new UnsupportedOperationException(
+                "The DeactivatedCheckpointIDCounter cannot serve checkpoint 
ids.");
+    }
+
+    @Override
+    public long get() {
+        throw new UnsupportedOperationException(
+                "The DeactivatedCheckpointIDCounter cannot serve checkpoint 
ids.");
+    }
+
+    @Override
+    public void setCount(long newId) throws Exception {
+        throw new UnsupportedOperationException(
+                "The DeactivatedCheckpointIDCounter cannot serve checkpoint 
ids.");
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 5dc499c..b213a31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -90,6 +90,7 @@ public class ExecutionGraphBuilder {
             SlotProvider slotProvider,
             ClassLoader classLoader,
             CheckpointRecoveryFactory recoveryFactory,
+            CheckpointIDCounter checkpointIdCounter,
             Time rpcTimeout,
             MetricGroup metrics,
             BlobWriter blobWriter,
@@ -109,6 +110,7 @@ public class ExecutionGraphBuilder {
                 slotProvider,
                 classLoader,
                 recoveryFactory,
+                checkpointIdCounter,
                 rpcTimeout,
                 metrics,
                 blobWriter,
@@ -130,6 +132,7 @@ public class ExecutionGraphBuilder {
             SlotProvider slotProvider,
             ClassLoader classLoader,
             CheckpointRecoveryFactory recoveryFactory,
+            CheckpointIDCounter checkpointIdCounter,
             Time rpcTimeout,
             MetricGroup metrics,
             BlobWriter blobWriter,
@@ -261,13 +264,11 @@ public class ExecutionGraphBuilder {
                     idToVertex(snapshotSettings.getVerticesToConfirm(), 
executionGraph);
 
             final CompletedCheckpointStore completedCheckpoints;
-            final CheckpointIDCounter checkpointIdCounter;
 
             try {
                 completedCheckpoints =
                         createCompletedCheckpointStore(
                                 jobManagerConfig, classLoader, 
recoveryFactory, log, jobId);
-                checkpointIdCounter = 
createCheckpointIdCounter(recoveryFactory, jobId);
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId, "Failed to initialize high-availability 
checkpoint handler", e);
@@ -365,11 +366,11 @@ public class ExecutionGraphBuilder {
         return executionGraph;
     }
 
-    private static boolean isCheckpointingEnabled(JobGraph jobGraph) {
+    public static boolean isCheckpointingEnabled(JobGraph jobGraph) {
         return jobGraph.getCheckpointingSettings() != null;
     }
 
-    private static CheckpointIDCounter createCheckpointIdCounter(
+    public static CheckpointIDCounter createCheckpointIdCounter(
             CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws 
Exception {
         return recoveryFactory.createCheckpointIDCounter(jobId);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 9c8ea7f..963f4b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -33,9 +33,11 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -276,6 +278,22 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                     }
                 };
 
+        final JobID jobId = jobGraph.getJobID();
+        final CheckpointIDCounter checkpointIdCounter;
+
+        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
+            try {
+                checkpointIdCounter =
+                        ExecutionGraphBuilder.createCheckpointIdCounter(
+                                checkpointRecoveryFactory, jobId);
+            } catch (Exception e) {
+                throw new JobExecutionException(
+                        jobId, "Failed to initialize high-availability 
checkpoint handler", e);
+            }
+        } else {
+            checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE;
+        }
+
         return ExecutionGraphBuilder.buildGraph(
                 null,
                 jobGraph,
@@ -285,6 +303,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                 slotProvider,
                 userCodeLoader,
                 checkpointRecoveryFactory,
+                checkpointIdCounter,
                 rpcTimeout,
                 currentJobManagerJobMetricGroup,
                 blobWriter,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index a23b26e..7367299 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -119,6 +119,7 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
                         mock(SlotProvider.class),
                         classLoader,
                         new StandaloneCheckpointRecoveryFactory(),
+                        new StandaloneCheckpointIDCounter(),
                         timeout,
                         new UnregisteredMetricsGroup(),
                         VoidBlobWriter.getInstance(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7b9f62c..d83305c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -751,6 +752,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                 new ProgrammedSlotProvider(1),
                 getClass().getClassLoader(),
                 new StandaloneCheckpointRecoveryFactory(),
+                new StandaloneCheckpointIDCounter(),
                 timeout,
                 new UnregisteredMetricsGroup(),
                 blobWriter,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index 5f2c9ae..af704db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -76,6 +77,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
                         new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
                         Thread.currentThread().getContextClassLoader(),
                         new StandaloneCheckpointRecoveryFactory(),
+                        new StandaloneCheckpointIDCounter(),
                         AkkaUtils.getDefaultTimeout(),
                         new UnregisteredMetricsGroup(),
                         VoidBlobWriter.getInstance(),
@@ -108,6 +110,7 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                         new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
                         Thread.currentThread().getContextClassLoader(),
                         new StandaloneCheckpointRecoveryFactory(),
+                        new StandaloneCheckpointIDCounter(),
                         AkkaUtils.getDefaultTimeout(),
                         new UnregisteredMetricsGroup(),
                         VoidBlobWriter.getInstance(),
@@ -140,6 +143,7 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                         new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
                         Thread.currentThread().getContextClassLoader(),
                         new StandaloneCheckpointRecoveryFactory(),
+                        new StandaloneCheckpointIDCounter(),
                         AkkaUtils.getDefaultTimeout(),
                         new UnregisteredMetricsGroup(),
                         VoidBlobWriter.getInstance(),
@@ -186,6 +190,7 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                     new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
                     Thread.currentThread().getContextClassLoader(),
                     new StandaloneCheckpointRecoveryFactory(),
+                    new StandaloneCheckpointIDCounter(),
                     AkkaUtils.getDefaultTimeout(),
                     new UnregisteredMetricsGroup(),
                     VoidBlobWriter.getInstance(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index e6d438a..2b1700f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -237,6 +238,7 @@ public class ExecutionVertexLocalityTest extends TestLogger 
{
                 mock(SlotProvider.class),
                 getClass().getClassLoader(),
                 new StandaloneCheckpointRecoveryFactory(),
+                new StandaloneCheckpointIDCounter(),
                 timeout,
                 new UnregisteredMetricsGroup(),
                 VoidBlobWriter.getInstance(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
index 2a31b4e..b07cad2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
@@ -26,7 +26,9 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
@@ -72,6 +74,7 @@ public class TestingExecutionGraphBuilder {
     private MetricGroup metricGroup = new UnregisteredMetricsGroup();
     private CheckpointRecoveryFactory checkpointRecoveryFactory =
             new StandaloneCheckpointRecoveryFactory();
+    private CheckpointIDCounter checkpointIdCounter = new 
StandaloneCheckpointIDCounter();
     private ExecutionDeploymentListener executionDeploymentListener =
             NoOpExecutionDeploymentListener.get();
     private ExecutionStateUpdateListener executionStateUpdateListener = 
(execution, newState) -> {};
@@ -145,6 +148,12 @@ public class TestingExecutionGraphBuilder {
         return this;
     }
 
+    public TestingExecutionGraphBuilder setCheckpointIdCounter(
+            CheckpointIDCounter checkpointIdCounter) {
+        this.checkpointIdCounter = checkpointIdCounter;
+        return this;
+    }
+
     public TestingExecutionGraphBuilder setExecutionDeploymentListener(
             ExecutionDeploymentListener executionDeploymentListener) {
         this.executionDeploymentListener = executionDeploymentListener;
@@ -167,6 +176,7 @@ public class TestingExecutionGraphBuilder {
                 slotProvider,
                 userClassLoader,
                 checkpointRecoveryFactory,
+                checkpointIdCounter,
                 rpcTimeout,
                 metricGroup,
                 blobWriter,

Reply via email to