This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fa49e8458390e8bdb0c179b4c8d8e0e9b55e8596 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 14:26:15 2021 +0100 [FLINK-20846] Move CheckpointIDCounter creation out of ExecutionGraphBuilder --- .../checkpoint/DeactivatedCheckpointIDCounter.java | 54 ++++++++++++++++++++++ .../executiongraph/ExecutionGraphBuilder.java | 9 ++-- .../flink/runtime/scheduler/SchedulerBase.java | 19 ++++++++ .../CheckpointSettingsSerializableTest.java | 1 + .../ExecutionGraphDeploymentTest.java | 2 + .../ExecutionGraphRescalingTest.java | 5 ++ .../ExecutionVertexLocalityTest.java | 2 + .../TestingExecutionGraphBuilder.java | 10 ++++ 8 files changed, 98 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java new file mode 100644 index 0000000..ca86159 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobStatus; + +/** + * This class represents a {@link CheckpointIDCounter} if checkpointing is deactivated. + * Consequently, no component should use methods of this class other than {@link #start()} and + * {@link #shutdown}. + */ +public enum DeactivatedCheckpointIDCounter implements CheckpointIDCounter { + INSTANCE; + + @Override + public void start() throws Exception {} + + @Override + public void shutdown(JobStatus jobStatus) throws Exception {} + + @Override + public long getAndIncrement() throws Exception { + throw new UnsupportedOperationException( + "The DeactivatedCheckpointIDCounter cannot serve checkpoint ids."); + } + + @Override + public long get() { + throw new UnsupportedOperationException( + "The DeactivatedCheckpointIDCounter cannot serve checkpoint ids."); + } + + @Override + public void setCount(long newId) throws Exception { + throw new UnsupportedOperationException( + "The DeactivatedCheckpointIDCounter cannot serve checkpoint ids."); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 5dc499c..b213a31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -90,6 +90,7 @@ public class ExecutionGraphBuilder { SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, + CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, MetricGroup metrics, BlobWriter blobWriter, @@ -109,6 +110,7 @@ public class ExecutionGraphBuilder { slotProvider, classLoader, recoveryFactory, + checkpointIdCounter, rpcTimeout, metrics, blobWriter, @@ -130,6 +132,7 @@ public class ExecutionGraphBuilder { SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, + CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, MetricGroup metrics, BlobWriter blobWriter, @@ -261,13 +264,11 @@ public class ExecutionGraphBuilder { idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph); final CompletedCheckpointStore completedCheckpoints; - final CheckpointIDCounter checkpointIdCounter; try { completedCheckpoints = createCompletedCheckpointStore( jobManagerConfig, classLoader, recoveryFactory, log, jobId); - checkpointIdCounter = createCheckpointIdCounter(recoveryFactory, jobId); } catch (Exception e) { throw new JobExecutionException( jobId, "Failed to initialize high-availability checkpoint handler", e); @@ -365,11 +366,11 @@ public class ExecutionGraphBuilder { return executionGraph; } - private static boolean isCheckpointingEnabled(JobGraph jobGraph) { + public static boolean isCheckpointingEnabled(JobGraph jobGraph) { return jobGraph.getCheckpointingSettings() != null; } - private static CheckpointIDCounter createCheckpointIdCounter( + public static CheckpointIDCounter createCheckpointIdCounter( CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception { return recoveryFactory.createCheckpointIDCounter(jobId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9c8ea7f..963f4b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -33,9 +33,11 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -276,6 +278,22 @@ public abstract class SchedulerBase implements SchedulerNG { } }; + final JobID jobId = jobGraph.getJobID(); + final CheckpointIDCounter checkpointIdCounter; + + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + checkpointIdCounter = + ExecutionGraphBuilder.createCheckpointIdCounter( + checkpointRecoveryFactory, jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, "Failed to initialize high-availability checkpoint handler", e); + } + } else { + checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE; + } + return ExecutionGraphBuilder.buildGraph( null, jobGraph, @@ -285,6 +303,7 @@ public abstract class SchedulerBase implements SchedulerNG { slotProvider, userCodeLoader, checkpointRecoveryFactory, + checkpointIdCounter, rpcTimeout, currentJobManagerJobMetricGroup, blobWriter, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index a23b26e..7367299 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -119,6 +119,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger { mock(SlotProvider.class), classLoader, new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), timeout, new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 7b9f62c..d83305c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -751,6 +752,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { new ProgrammedSlotProvider(1), getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), timeout, new UnregisteredMetricsGroup(), blobWriter, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java index 5f2c9ae..af704db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -76,6 +77,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { new TestingSlotProvider(ignore -> new CompletableFuture<>()), Thread.currentThread().getContextClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), AkkaUtils.getDefaultTimeout(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), @@ -108,6 +110,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { new TestingSlotProvider(ignore -> new CompletableFuture<>()), Thread.currentThread().getContextClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), AkkaUtils.getDefaultTimeout(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), @@ -140,6 +143,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { new TestingSlotProvider(ignore -> new CompletableFuture<>()), Thread.currentThread().getContextClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), AkkaUtils.getDefaultTimeout(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), @@ -186,6 +190,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { new TestingSlotProvider(ignore -> new CompletableFuture<>()), Thread.currentThread().getContextClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), AkkaUtils.getDefaultTimeout(), new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index e6d438a..2b1700f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -237,6 +238,7 @@ public class ExecutionVertexLocalityTest extends TestLogger { mock(SlotProvider.class), getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), + new StandaloneCheckpointIDCounter(), timeout, new UnregisteredMetricsGroup(), VoidBlobWriter.getInstance(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java index 2a31b4e..b07cad2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java @@ -26,7 +26,9 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -72,6 +74,7 @@ public class TestingExecutionGraphBuilder { private MetricGroup metricGroup = new UnregisteredMetricsGroup(); private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + private CheckpointIDCounter checkpointIdCounter = new StandaloneCheckpointIDCounter(); private ExecutionDeploymentListener executionDeploymentListener = NoOpExecutionDeploymentListener.get(); private ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {}; @@ -145,6 +148,12 @@ public class TestingExecutionGraphBuilder { return this; } + public TestingExecutionGraphBuilder setCheckpointIdCounter( + CheckpointIDCounter checkpointIdCounter) { + this.checkpointIdCounter = checkpointIdCounter; + return this; + } + public TestingExecutionGraphBuilder setExecutionDeploymentListener( ExecutionDeploymentListener executionDeploymentListener) { this.executionDeploymentListener = executionDeploymentListener; @@ -167,6 +176,7 @@ public class TestingExecutionGraphBuilder { slotProvider, userClassLoader, checkpointRecoveryFactory, + checkpointIdCounter, rpcTimeout, metricGroup, blobWriter,
