This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f64261c [FLINK-22483][runtime] Remove CompletedCheckpointStore#recover) method an change contract of CheckpointRecoveryFactory#createCompletedCheckpointStore, so that newly constructed CheckpointStore is already recovered. f64261c is described below commit f64261c91b195ecdcd99975b51de540db89a3f48 Author: David Moravek <d...@apache.org> AuthorDate: Thu Aug 5 16:52:47 2021 +0200 [FLINK-22483][runtime] Remove CompletedCheckpointStore#recover) method an change contract of CheckpointRecoveryFactory#createCompletedCheckpointStore, so that newly constructed CheckpointStore is already recovered. It's enough to recover CompletedCheckpointStoreshould only once, right after JobMasterRunner gains leadership. This also ensures that we'll fetch checkpoints from the external store in a "jobmaster-future-thread", without pontetially blocking RPC threads. This closes #16652. --- .../KubernetesCheckpointRecoveryFactory.java | 2 +- .../flink/kubernetes/utils/KubernetesUtils.java | 3 + .../runtime/checkpoint/CheckpointCoordinator.java | 9 +- .../checkpoint/CheckpointRecoveryFactory.java | 6 +- .../checkpoint/CompletedCheckpointStore.java | 10 +- ...ctivatedCheckpointCompletedCheckpointStore.java | 5 - .../DefaultCompletedCheckpointStore.java | 163 ++++------------- .../DefaultCompletedCheckpointStoreUtils.java | 117 +++++++++++++ .../EmbeddedCompletedCheckpointStore.java | 50 ++++-- .../PerJobCheckpointRecoveryFactory.java | 55 ++++-- .../StandaloneCheckpointRecoveryFactory.java | 2 +- .../StandaloneCompletedCheckpointStore.java | 5 - .../ZooKeeperCheckpointRecoveryFactory.java | 2 +- .../EmbeddedHaServicesWithLeadershipControl.java | 29 ++- .../flink/runtime/minicluster/MiniCluster.java | 2 +- .../flink/runtime/scheduler/SchedulerUtils.java | 2 +- .../apache/flink/runtime/util/ZooKeeperUtils.java | 4 + .../CheckpointCoordinatorFailureTest.java | 5 - .../CheckpointCoordinatorRestoringTest.java | 194 +++++++++++++-------- .../checkpoint/CheckpointCoordinatorTest.java | 33 ++-- .../CheckpointCoordinatorTestingUtils.java | 9 + .../checkpoint/CompletedCheckpointStoreTest.java | 20 +-- .../DefaultCompletedCheckpointStoreTest.java | 61 ++++--- .../DefaultCompletedCheckpointStoreUtilsTest.java | 138 +++++++++++++++ .../checkpoint/PerJobCheckpointRecoveryTest.java | 58 ++++++ .../StandaloneCompletedCheckpointStoreTest.java | 10 +- .../TestingCheckpointRecoveryFactory.java | 2 +- .../TestingCompletedCheckpointStore.java | 3 - .../ZooKeeperCompletedCheckpointStoreITCase.java | 41 ++--- ...oKeeperCompletedCheckpointStoreMockitoTest.java | 8 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 133 +++----------- .../dispatcher/DispatcherFailoverITCase.java | 18 +- .../jobmanager/DefaultJobGraphStoreTest.java | 2 +- .../runtime/jobmaster/JobExecutionITCase.java | 2 +- .../flink/runtime/jobmaster/JobMasterTest.java | 11 +- .../LeaderChangeClusterComponentsTest.java | 2 +- .../TestingMiniClusterConfiguration.java | 17 +- .../persistence/TestingStateHandleStore.java | 2 +- .../test/checkpointing/CheckpointStoreITCase.java | 144 ++++++--------- .../NotifyCheckpointAbortedITCase.java | 26 +-- .../test/checkpointing/RegionFailoverITCase.java | 26 +-- .../flink/test/checkpointing/SavepointITCase.java | 28 +-- .../ZooKeeperLeaderElectionITCase.java | 2 +- 43 files changed, 809 insertions(+), 652 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java index 10ff0f7..f8a661f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java @@ -70,7 +70,7 @@ public class KubernetesCheckpointRecoveryFactory implements CheckpointRecoveryFa } @Override - public CompletedCheckpointStore createCheckpointStore( + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index d077c8e..ec662d0 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore; @@ -299,6 +300,8 @@ public class KubernetesUtils { maxNumberOfCheckpointsToRetain, stateHandleStore, KubernetesCheckpointStoreUtil.INSTANCE, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + stateHandleStore, KubernetesCheckpointStoreUtil.INSTANCE), executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 958bcc9..3b8dd1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1521,16 +1521,11 @@ public class CheckpointCoordinator { } // We create a new shared state registry object, so that all pending async disposal - // requests from previous - // runs will go against the old object (were they can do no harm). - // This must happen under the checkpoint lock. + // requests from previous runs will go against the old object (were they can do no + // harm). This must happen under the checkpoint lock. sharedStateRegistry.close(); sharedStateRegistry = sharedStateRegistryFactory.create(executor); - // Recover the checkpoints, TODO this could be done only when there is a new leader, not - // on each recovery - completedCheckpointStore.recover(); - // Now, we re-register all (shared) states from the checkpoint store with the new // registry for (CompletedCheckpoint completedCheckpoint : diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java index bf6aa9f..88c2a56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -24,14 +24,16 @@ import org.apache.flink.api.common.JobID; public interface CheckpointRecoveryFactory { /** - * Creates a {@link CompletedCheckpointStore} instance for a job. + * Creates a RECOVERED {@link CompletedCheckpointStore} instance for a job. In this context, + * RECOVERED means, that if we already have completed checkpoints from previous runs, we should + * use them as the initial state. * * @param jobId Job ID to recover checkpoints for * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain * @param userClassLoader User code class loader of the job * @return {@link CompletedCheckpointStore} instance for the job */ - CompletedCheckpointStore createCheckpointStore( + CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 8a73f4b..412b148 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -33,14 +33,6 @@ public interface CompletedCheckpointStore { Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class); /** - * Recover available {@link CompletedCheckpoint} instances. - * - * <p>After a call to this method, {@link #getLatestCheckpoint(boolean)} returns the latest - * available checkpoint. - */ - void recover() throws Exception; - - /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. * * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of @@ -107,7 +99,7 @@ public interface CompletedCheckpointStore { * or kept. * * @param jobStatus Job state on shut down - * @param checkpointsCleaner that will cleanup copmpleted checkpoints if needed + * @param checkpointsCleaner that will cleanup completed checkpoints if needed */ void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java index f0ef3cf..daee48e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java @@ -31,11 +31,6 @@ public enum DeactivatedCheckpointCompletedCheckpointStore implements CompletedCh INSTANCE; @Override - public void recover() throws Exception { - throw unsupportedOperationException(); - } - - @Override public void addCheckpoint( CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index 8bc3206..d505124 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -19,24 +19,20 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.ResourceVersion; import org.apache.flink.runtime.persistence.StateHandleStore; -import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Comparator; +import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.Executor; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -61,9 +57,6 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> private static final Logger LOG = LoggerFactory.getLogger(DefaultCompletedCheckpointStore.class); - private static final Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> - STRING_COMPARATOR = Comparator.comparing(o -> o.f1); - /** Completed checkpoints state handle store. */ private final StateHandleStore<CompletedCheckpoint, R> checkpointStateHandleStore; @@ -81,6 +74,9 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> private final CheckpointStoreUtil completedCheckpointStoreUtil; + /** False if store has been shutdown. */ + private final AtomicBoolean running = new AtomicBoolean(true); + /** * Creates a {@link DefaultCompletedCheckpointStore} instance. * @@ -95,18 +91,14 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> int maxNumberOfCheckpointsToRetain, StateHandleStore<CompletedCheckpoint, R> stateHandleStore, CheckpointStoreUtil completedCheckpointStoreUtil, + Collection<CompletedCheckpoint> completedCheckpoints, Executor executor) { - checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); - this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; - this.checkpointStateHandleStore = checkNotNull(stateHandleStore); - this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); - + this.completedCheckpoints.addAll(completedCheckpoints); this.ioExecutor = checkNotNull(executor); - this.completedCheckpointStoreUtil = checkNotNull(completedCheckpointStoreUtil); } @@ -116,54 +108,12 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> } /** - * Recover all the valid checkpoints from state handle store. All the successfully recovered - * checkpoints will be added to {@link #completedCheckpoints} sorted by checkpoint id. - */ - @Override - public void recover() throws Exception { - LOG.info("Recovering checkpoints from {}.", checkpointStateHandleStore); - - // Get all there is first - final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints = - checkpointStateHandleStore.getAllAndLock(); - - initialCheckpoints.sort(STRING_COMPARATOR); - - final int numberOfInitialCheckpoints = initialCheckpoints.size(); - - LOG.info( - "Found {} checkpoints in {}.", - numberOfInitialCheckpoints, - checkpointStateHandleStore); - if (haveAllDownloaded(initialCheckpoints)) { - LOG.info( - "All {} checkpoints found are already downloaded.", numberOfInitialCheckpoints); - return; - } - - final List<CompletedCheckpoint> retrievedCheckpoints = - new ArrayList<>(numberOfInitialCheckpoints); - LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - - for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : - initialCheckpoints) { - retrievedCheckpoints.add( - checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle))); - } - - // Clear local handles in order to prevent duplicates on recovery. The local handles should - // reflect the state handle store contents. - completedCheckpoints.clear(); - completedCheckpoints.addAll(retrievedCheckpoints); - } - - /** * Synchronously writes the new checkpoints to state handle store and asynchronously removes * older ones. * * @param checkpoint Completed checkpoint to add. * @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the - * system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint + * system in a possibly inconsistent state, i.e. it's uncertain whether the checkpoint * metadata was fully written to the underlying systems or not. */ @Override @@ -172,13 +122,13 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception { - + Preconditions.checkState(running.get(), "Checkpoint store has already been shutdown."); checkNotNull(checkpoint, "Checkpoint"); final String path = completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID()); - // Now add the new one. If it fails, we don't want to loose existing data. + // Now add the new one. If it fails, we don't want to lose existing data. checkpointStateHandleStore.addAndLock(path, checkpoint); completedCheckpoints.addLast(checkpoint); @@ -214,30 +164,28 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> @Override public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception { - if (jobStatus.isGloballyTerminalState()) { - LOG.info("Shutting down"); - - for (CompletedCheckpoint checkpoint : completedCheckpoints) { - try { - tryRemoveCompletedCheckpoint( - checkpoint, - checkpoint.shouldBeDiscardedOnShutdown(jobStatus), - checkpointsCleaner, - () -> {}); - } catch (Exception e) { - LOG.warn("Fail to remove checkpoint during shutdown.", e); + if (running.compareAndSet(true, false)) { + if (jobStatus.isGloballyTerminalState()) { + LOG.info("Shutting down"); + for (CompletedCheckpoint checkpoint : completedCheckpoints) { + try { + tryRemoveCompletedCheckpoint( + checkpoint, + checkpoint.shouldBeDiscardedOnShutdown(jobStatus), + checkpointsCleaner, + () -> {}); + } catch (Exception e) { + LOG.warn("Fail to remove checkpoint during shutdown.", e); + } } + completedCheckpoints.clear(); + checkpointStateHandleStore.clearEntries(); + } else { + LOG.info("Suspending"); + // Clear the local handles, but don't remove any state + completedCheckpoints.clear(); + checkpointStateHandleStore.releaseAll(); } - - completedCheckpoints.clear(); - checkpointStateHandleStore.clearEntries(); - } else { - LOG.info("Suspending"); - - // Clear the local handles, but don't remove any state - completedCheckpoints.clear(); - - checkpointStateHandleStore.releaseAll(); } } @@ -257,25 +205,6 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> } } - private boolean haveAllDownloaded( - List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointPointers) { - if (completedCheckpoints.size() != checkpointPointers.size()) { - return false; - } - Set<Long> localIds = - completedCheckpoints.stream() - .map(CompletedCheckpoint::getCheckpointID) - .collect(Collectors.toSet()); - for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> initialCheckpoint : - checkpointPointers) { - if (!localIds.contains( - completedCheckpointStoreUtil.nameToCheckpointID(initialCheckpoint.f1))) { - return false; - } - } - return true; - } - /** * Tries to remove the checkpoint identified by the given checkpoint id. * @@ -286,34 +215,4 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>> return checkpointStateHandleStore.releaseAndTryRemove( completedCheckpointStoreUtil.checkpointIDToName(checkpointId)); } - - private CompletedCheckpoint retrieveCompletedCheckpoint( - Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle) - throws FlinkException { - long checkpointId = completedCheckpointStoreUtil.nameToCheckpointID(stateHandle.f1); - - LOG.info("Trying to retrieve checkpoint {}.", checkpointId); - - try { - return stateHandle.f0.retrieveState(); - } catch (ClassNotFoundException cnfe) { - throw new FlinkException( - "Could not retrieve checkpoint " - + checkpointId - + " from state handle under " - + stateHandle.f1 - + ". This indicates that you are trying to recover from state written by an " - + "older Flink version which is not compatible. Try cleaning the state handle store.", - cnfe); - } catch (IOException ioe) { - throw new FlinkException( - "Could not retrieve checkpoint " - + checkpointId - + " from state handle under " - + stateHandle.f1 - + ". This indicates that the retrieved state handle is broken. Try cleaning the " - + "state handle store.", - ioe); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java new file mode 100644 index 0000000..0c9d547 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.persistence.ResourceVersion; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Helper methods related to {@link DefaultCompletedCheckpointStore}. */ +public class DefaultCompletedCheckpointStoreUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(DefaultCompletedCheckpointStoreUtils.class); + + private DefaultCompletedCheckpointStoreUtils() { + // No-op. + } + + /** + * Fetch all {@link CompletedCheckpoint completed checkpoints} from an {@link StateHandleStore + * external store}. This method is intended for retrieving an initial state of {@link + * DefaultCompletedCheckpointStore}. + * + * @param checkpointStateHandleStore Completed checkpoints in external store. + * @param completedCheckpointStoreUtil Utilities for completed checkpoint store. + * @param <R> Type of {@link ResourceVersion} + * @return Immutable collection of {@link CompletedCheckpoint completed checkpoints}. + * @throws Exception If we're not able to fetch checkpoints for some reason. + */ + public static <R extends ResourceVersion<R>> + Collection<CompletedCheckpoint> retrieveCompletedCheckpoints( + StateHandleStore<CompletedCheckpoint, R> checkpointStateHandleStore, + CheckpointStoreUtil completedCheckpointStoreUtil) + throws Exception { + + LOG.info("Recovering checkpoints from {}.", checkpointStateHandleStore); + + // Get all there is first. + final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints = + checkpointStateHandleStore.getAllAndLock(); + + // Sort checkpoints by name. + initialCheckpoints.sort(Comparator.comparing(o -> o.f1)); + + final int numberOfInitialCheckpoints = initialCheckpoints.size(); + + LOG.info( + "Found {} checkpoints in {}.", + numberOfInitialCheckpoints, + checkpointStateHandleStore); + final List<CompletedCheckpoint> retrievedCheckpoints = + new ArrayList<>(numberOfInitialCheckpoints); + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); + + for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : + initialCheckpoints) { + retrievedCheckpoints.add( + checkNotNull( + retrieveCompletedCheckpoint( + completedCheckpointStoreUtil, checkpointStateHandle))); + } + return Collections.unmodifiableList(retrievedCheckpoints); + } + + private static CompletedCheckpoint retrieveCompletedCheckpoint( + CheckpointStoreUtil completedCheckpointStoreUtil, + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle) + throws FlinkException { + final long checkpointId = completedCheckpointStoreUtil.nameToCheckpointID(stateHandle.f1); + LOG.info("Trying to retrieve checkpoint {}.", checkpointId); + try { + return stateHandle.f0.retrieveState(); + } catch (ClassNotFoundException exception) { + throw new FlinkException( + String.format( + "Could not retrieve checkpoint %d from state handle under %s. This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", + checkpointId, stateHandle.f1), + exception); + } catch (IOException exception) { + throw new FlinkException( + String.format( + "Could not retrieve checkpoint %d from state handle under %s. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", + checkpointId, stateHandle.f1), + exception); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java index d55857c..2c5a748 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java @@ -25,17 +25,22 @@ import org.apache.flink.util.Preconditions; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; -/** - * An embedded in-memory checkpoint store, which supports shutdown and suspend. You can use this to - * test HA as long as the factory always returns the same store instance. - */ +/** An embedded in-memory checkpoint store, which supports shutdown and suspend. */ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStore { + private static void throwAlreadyShutdownException(JobStatus status) { + throw new IllegalStateException( + String.format("Store has been already shutdown with %s.", status)); + } + private final ArrayDeque<CompletedCheckpoint> checkpoints = new ArrayDeque<>(2); - private final Collection<CompletedCheckpoint> suspended = new ArrayDeque<>(2); + private final AtomicReference<JobStatus> shutdownStatus = new AtomicReference<>(); private final int maxRetainedCheckpoints; @@ -43,15 +48,15 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor this(1); } - EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) { - Preconditions.checkArgument(maxRetainedCheckpoints > 0); - this.maxRetainedCheckpoints = maxRetainedCheckpoints; + public EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) { + this(maxRetainedCheckpoints, Collections.emptyList()); } - @Override - public void recover() { - checkpoints.addAll(suspended); - suspended.clear(); + public EmbeddedCompletedCheckpointStore( + int maxRetainedCheckpoints, Collection<CompletedCheckpoint> initialCheckpoints) { + Preconditions.checkArgument(maxRetainedCheckpoints > 0); + this.maxRetainedCheckpoints = maxRetainedCheckpoints; + this.checkpoints.addAll(initialCheckpoints); } @Override @@ -60,8 +65,10 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception { + if (shutdownStatus.get() != null) { + throwAlreadyShutdownException(shutdownStatus.get()); + } checkpoints.addLast(checkpoint); - CheckpointSubsumeHelper.subsume( checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume); } @@ -75,13 +82,13 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor @Override public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception { - if (jobStatus.isGloballyTerminalState()) { - checkpoints.clear(); - suspended.clear(); + if (shutdownStatus.compareAndSet(null, jobStatus)) { + if (jobStatus.isGloballyTerminalState()) { + // We are done with this store. We should leave no checkpoints for recovery. + checkpoints.clear(); + } } else { - suspended.clear(); - suspended.addAll(checkpoints); - checkpoints.clear(); + throwAlreadyShutdownException(shutdownStatus.get()); } } @@ -104,4 +111,9 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor public boolean requiresExternalizedCheckpoints() { return false; } + + @VisibleForTesting + public Optional<JobStatus> getShutdownStatus() { + return Optional.ofNullable(shutdownStatus.get()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java index 3d178b3..5b91bb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java @@ -21,46 +21,63 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; +import java.util.function.IntFunction; import java.util.function.Supplier; /** * Simple {@link CheckpointRecoveryFactory} which creates and keeps separate {@link * CompletedCheckpointStore} and {@link CheckpointIDCounter} for each {@link JobID}. */ -public class PerJobCheckpointRecoveryFactory implements CheckpointRecoveryFactory { - private final Function<Integer, CompletedCheckpointStore> completedCheckpointStorePerJobFactory; +public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore> + implements CheckpointRecoveryFactory { + + @VisibleForTesting + public static <T extends CompletedCheckpointStore> + CheckpointRecoveryFactory withoutCheckpointStoreRecovery(IntFunction<T> storeFn) { + return new PerJobCheckpointRecoveryFactory<>( + (maxCheckpoints, previous) -> { + if (previous != null) { + throw new UnsupportedOperationException( + "Checkpoint store recovery is not supported."); + } + return storeFn.apply(maxCheckpoints); + }); + } + + private final BiFunction<Integer, T, T> completedCheckpointStorePerJobFactory; private final Supplier<CheckpointIDCounter> checkpointIDCounterPerJobFactory; - private final Map<JobID, CompletedCheckpointStore> store; - private final Map<JobID, CheckpointIDCounter> counter; + private final ConcurrentMap<JobID, T> store; + private final ConcurrentMap<JobID, CheckpointIDCounter> counter; + + public PerJobCheckpointRecoveryFactory( + BiFunction<Integer, T, T> completedCheckpointStorePerJobFactory) { + this(completedCheckpointStorePerJobFactory, StandaloneCheckpointIDCounter::new); + } public PerJobCheckpointRecoveryFactory( - Function<Integer, CompletedCheckpointStore> completedCheckpointStorePerJobFactory, + BiFunction<Integer, T, T> completedCheckpointStorePerJobFactory, Supplier<CheckpointIDCounter> checkpointIDCounterPerJobFactory) { this.completedCheckpointStorePerJobFactory = completedCheckpointStorePerJobFactory; this.checkpointIDCounterPerJobFactory = checkpointIDCounterPerJobFactory; - this.store = new HashMap<>(); - this.counter = new HashMap<>(); + this.store = new ConcurrentHashMap<>(); + this.counter = new ConcurrentHashMap<>(); } @Override - public CompletedCheckpointStore createCheckpointStore( + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { - return store.computeIfAbsent( + return store.compute( jobId, - jId -> completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain)); + (key, previous) -> + completedCheckpointStorePerJobFactory.apply( + maxNumberOfCheckpointsToRetain, previous)); } @Override public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) { return counter.computeIfAbsent(jobId, jId -> checkpointIDCounterPerJobFactory.get()); } - - @VisibleForTesting - public static CheckpointRecoveryFactory useSameServicesForAllJobs( - CompletedCheckpointStore store, CheckpointIDCounter counter) { - return new PerJobCheckpointRecoveryFactory(n -> store, () -> counter); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index ac08e79..c323256 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory { @Override - public CompletedCheckpointStore createCheckpointStore( + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 7723ff8..dce9531 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -57,11 +57,6 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } @Override - public void recover() throws Exception { - // Nothing to do - } - - @Override public void addCheckpoint( CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 379d6a3..6d665947 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -46,7 +46,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac } @Override - public CompletedCheckpointStore createCheckpointStore( + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java index 236594e..540fbf9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -30,14 +29,30 @@ import java.util.concurrent.Executor; /** {@link EmbeddedHaServices} extension to expose leadership granting and revoking. */ public class EmbeddedHaServicesWithLeadershipControl extends EmbeddedHaServices implements HaLeadershipControl { - private final CheckpointRecoveryFactory testingCheckpointRecoveryFactory; + + private final CheckpointRecoveryFactory checkpointRecoveryFactory; public EmbeddedHaServicesWithLeadershipControl(Executor executor) { + this( + executor, + new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>( + (maxCheckpoints, previous) -> { + if (previous != null) { + if (!previous.getShutdownStatus().isPresent()) { + throw new IllegalStateException( + "Completed checkpoint store from previous run has not yet shutdown."); + } + return new EmbeddedCompletedCheckpointStore( + maxCheckpoints, previous.getAllCheckpoints()); + } + return new EmbeddedCompletedCheckpointStore(maxCheckpoints); + })); + } + + public EmbeddedHaServicesWithLeadershipControl( + Executor executor, CheckpointRecoveryFactory checkpointRecoveryFactory) { super(executor); - this.testingCheckpointRecoveryFactory = - new PerJobCheckpointRecoveryFactory( - n -> new EmbeddedCompletedCheckpointStore(), - StandaloneCheckpointIDCounter::new); + this.checkpointRecoveryFactory = checkpointRecoveryFactory; } @Override @@ -82,7 +97,7 @@ public class EmbeddedHaServicesWithLeadershipControl extends EmbeddedHaServices public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { synchronized (lock) { checkNotShutdown(); - return testingCheckpointRecoveryFactory; + return checkpointRecoveryFactory; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 51b6bc2..6041a02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -503,7 +503,7 @@ public class MiniCluster implements AutoCloseableAsync { return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, executor); default: - throw new IllegalConfigurationException("Unkown HA Services " + haServices); + throw new IllegalConfigurationException("Unknown HA Services " + haServices); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java index ea31bb2..481baa3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -88,7 +88,7 @@ public final class SchedulerUtils { CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); } - return recoveryFactory.createCheckpointStore( + return recoveryFactory.createRecoveredCompletedCheckpointStore( jobId, maxNumberOfCheckpointsToRetain, classLoader); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index d9d5b18..01e9774 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.checkpoint.DefaultLastStateConnectionStateListener; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil; @@ -439,6 +440,9 @@ public class ZooKeeperUtils { maxNumberOfCheckpointsToRetain, completedCheckpointStateHandleStore, ZooKeeperCheckpointStoreUtil.INSTANCE, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + completedCheckpointStateHandleStore, + ZooKeeperCheckpointStoreUtil.INSTANCE), executor); LOG.info( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 6445fe9..2424100 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -255,11 +255,6 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { } @Override - public void recover() throws Exception { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override public void addCheckpoint( CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index 8adb1c8..c67b4d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -65,6 +65,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -94,6 +95,7 @@ import static org.mockito.Mockito.verify; /** Tests for restoring checkpoint. */ @SuppressWarnings("checkstyle:EmptyLineSeparator") public class CheckpointCoordinatorRestoringTest extends TestLogger { + private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location"; private enum TestScaleType { @@ -102,6 +104,70 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { SAME_PARALLELISM; } + private static void acknowledgeCheckpoint( + CheckpointCoordinator coordinator, + ExecutionGraph executionGraph, + ExecutionJobVertex jobVertex, + long checkpointId) + throws Exception { + final List<KeyGroupRange> partitions = + StateAssignmentOperation.createKeyGroupPartitions( + jobVertex.getMaxParallelism(), jobVertex.getParallelism()); + for (int partitionIdx = 0; partitionIdx < partitions.size(); partitionIdx++) { + TaskStateSnapshot subtaskState = + mockSubtaskState( + jobVertex.getJobVertexId(), partitionIdx, partitions.get(partitionIdx)); + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint( + executionGraph.getJobID(), + jobVertex + .getTaskVertices()[partitionIdx] + .getCurrentExecutionAttempt() + .getAttemptId(), + checkpointId, + new CheckpointMetrics(), + subtaskState); + coordinator.receiveAcknowledgeMessage( + acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); + } + } + + private static ExecutionGraph createExecutionGraph(List<TestingVertex> vertices) + throws Exception { + final CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder builder = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder(); + for (TestingVertex vertex : vertices) { + builder.addJobVertex( + vertex.getId(), vertex.getParallelism(), vertex.getMaxParallelism()); + } + return builder.build(); + } + + private static class TestingVertex { + + private final JobVertexID id; + private final int parallelism; + private final int maxParallelism; + + private TestingVertex(JobVertexID id, int parallelism, int maxParallelism) { + this.id = id; + this.parallelism = parallelism; + this.maxParallelism = maxParallelism; + } + + public JobVertexID getId() { + return id; + } + + public int getParallelism() { + return parallelism; + } + + public int getMaxParallelism() { + return maxParallelism; + } + } + private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor; @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -116,99 +182,80 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { /** * Tests that the checkpointed partitioned and non-partitioned state is assigned properly to the * {@link Execution} upon recovery. - * - * @throws Exception */ @Test public void testRestoreLatestCheckpointedState() throws Exception { - final JobVertexID jobVertexID1 = new JobVertexID(); - final JobVertexID jobVertexID2 = new JobVertexID(); - int parallelism1 = 3; - int parallelism2 = 2; - int maxParallelism1 = 42; - int maxParallelism2 = 13; - - final ExecutionGraph graph = - new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() - .addJobVertex(jobVertexID1, parallelism1, maxParallelism1) - .addJobVertex(jobVertexID2, parallelism2, maxParallelism2) - .build(); - - final ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1); - final ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2); + final List<TestingVertex> vertices = + Arrays.asList( + new TestingVertex(new JobVertexID(), 3, 42), + new TestingVertex(new JobVertexID(), 2, 13)); + testRestoreLatestCheckpointedState( + vertices, + testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(vertices)); + } - CompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(); + private Collection<CompletedCheckpoint> + testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore( + List<TestingVertex> vertices) throws Exception { + final ExecutionGraph executionGraph = createExecutionGraph(vertices); + final EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = + final CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder() - .setExecutionGraph(graph) - .setCompletedCheckpointStore(store) + .setExecutionGraph(executionGraph) .setTimer(manuallyTriggeredScheduledExecutor) + .setCompletedCheckpointStore(store) .build(); // trigger the checkpoint - coord.triggerCheckpoint(false); + coordinator.triggerCheckpoint(false); manuallyTriggeredScheduledExecutor.triggerAll(); - assertEquals(1, coord.getPendingCheckpoints().size()); - long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - - List<KeyGroupRange> keyGroupPartitions1 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); - List<KeyGroupRange> keyGroupPartitions2 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); - - for (int index = 0; index < jobVertex1.getParallelism(); index++) { - TaskStateSnapshot subtaskState = - mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index)); + // we should have a single pending checkpoint + assertEquals(1, coordinator.getPendingCheckpoints().size()); + final long checkpointId = + Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet()); - AcknowledgeCheckpoint acknowledgeCheckpoint = - new AcknowledgeCheckpoint( - graph.getJobID(), - jobVertex1 - .getTaskVertices()[index] - .getCurrentExecutionAttempt() - .getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskState); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); + // acknowledge checkpoints from all vertex partitions + for (TestingVertex vertex : vertices) { + final ExecutionJobVertex executionVertex = + Objects.requireNonNull(executionGraph.getJobVertex(vertex.getId())); + acknowledgeCheckpoint(coordinator, executionGraph, executionVertex, checkpointId); } - for (int index = 0; index < jobVertex2.getParallelism(); index++) { - TaskStateSnapshot subtaskState = - mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index)); - - AcknowledgeCheckpoint acknowledgeCheckpoint = - new AcknowledgeCheckpoint( - graph.getJobID(), - jobVertex2 - .getTaskVertices()[index] - .getCurrentExecutionAttempt() - .getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskState); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); - } - - List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); - + final List<CompletedCheckpoint> completedCheckpoints = + coordinator.getSuccessfulCheckpoints(); assertEquals(1, completedCheckpoints.size()); // shutdown the store store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner()); - // restore the store - Set<ExecutionJobVertex> tasks = new HashSet<>(); + return store.getAllCheckpoints(); + } - tasks.add(jobVertex1); - tasks.add(jobVertex2); + private void testRestoreLatestCheckpointedState( + List<TestingVertex> vertices, Collection<CompletedCheckpoint> completedCheckpoints) + throws Exception { + final ExecutionGraph executionGraph = createExecutionGraph(vertices); + final EmbeddedCompletedCheckpointStore store = + new EmbeddedCompletedCheckpointStore( + completedCheckpoints.size(), completedCheckpoints); - assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false)); + // set up the coordinator and validate the initial state + final CheckpointCoordinator coordinator = + new CheckpointCoordinatorBuilder() + .setExecutionGraph(executionGraph) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCompletedCheckpointStore(store) + .build(); + + final Set<ExecutionJobVertex> executionVertices = + vertices.stream() + .map(TestingVertex::getId) + .map(executionGraph::getJobVertex) + .collect(Collectors.toSet()); + assertTrue(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false)); // validate that all shared states are registered again after the recovery. for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { @@ -221,8 +268,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { } // verify the restored state - verifyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1); - verifyStateRestore(jobVertexID2, jobVertex2, keyGroupPartitions2); + for (ExecutionJobVertex executionVertex : executionVertices) { + verifyStateRestore(executionVertex); + } } @Test @@ -393,8 +441,6 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { /** * Tests the checkpoint restoration with changing parallelism of job vertex with partitioned * state. - * - * @throws Exception */ private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index def269f..933069a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2265,7 +2265,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } @Test - public void testMaxConcurrentAttempsWithSubsumption() throws Exception { + public void testMaxConcurrentAttemptsWithSubsumption() throws Exception { final int maxConcurrentAttempts = 2; JobVertexID jobVertexID1 = new JobVertexID(); @@ -2801,15 +2801,13 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1); - EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(10); - + final EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(10); final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2); // set up the coordinator and validate the initial state - CheckpointCoordinator checkpointCoordinator = + final CheckpointCoordinatorBuilder coordinatorBuilder = new CheckpointCoordinatorBuilder() .setExecutionGraph(graph) - .setCompletedCheckpointStore(store) .setTimer(manuallyTriggeredScheduledExecutor) .setSharedStateRegistryFactory( deleteExecutor -> { @@ -2817,8 +2815,9 @@ public class CheckpointCoordinatorTest extends TestLogger { new SharedStateRegistry(deleteExecutor); createdSharedStateRegistries.add(instance); return instance; - }) - .build(); + }); + final CheckpointCoordinator coordinator = + coordinatorBuilder.setCompletedCheckpointStore(store).build(); final int numCheckpoints = 3; @@ -2827,11 +2826,10 @@ public class CheckpointCoordinatorTest extends TestLogger { for (int i = 0; i < numCheckpoints; ++i) { performIncrementalCheckpoint( - graph.getJobID(), checkpointCoordinator, jobVertex1, keyGroupPartitions1, i); + graph.getJobID(), coordinator, jobVertex1, keyGroupPartitions1, i); } - List<CompletedCheckpoint> completedCheckpoints = - checkpointCoordinator.getSuccessfulCheckpoints(); + List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints(); assertEquals(numCheckpoints, completedCheckpoints.size()); int sharedHandleCount = 0; @@ -2901,7 +2899,13 @@ public class CheckpointCoordinatorTest extends TestLogger { // restore the store Set<ExecutionJobVertex> tasks = new HashSet<>(); tasks.add(jobVertex1); - assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, false)); + + assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null)); + final EmbeddedCompletedCheckpointStore secondStore = + new EmbeddedCompletedCheckpointStore(10, store.getAllCheckpoints()); + final CheckpointCoordinator secondCoordinator = + coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(); + assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false)); // validate that all shared states are registered again after the recovery. cp = 0; @@ -2919,7 +2923,8 @@ public class CheckpointCoordinatorTest extends TestLogger { // check that all are registered with the new registry verify(keyedStateHandle, verificationMode) - .registerSharedStates(createdSharedStateRegistries.get(1)); + .registerSharedStates( + Iterables.getLast(createdSharedStateRegistries)); } } } @@ -2927,7 +2932,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // discard CP1 - store.removeOldestCheckpoint(); + secondStore.removeOldestCheckpoint(); // we expect that all shared state from CP0 is no longer referenced and discarded. CP2 is // still live and also @@ -2945,7 +2950,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // discard CP2 - store.removeOldestCheckpoint(); + secondStore.removeOldestCheckpoint(); // we expect all shared state was discarded now, because all CPs are for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 38f9d2c..b990147 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -218,6 +218,15 @@ public class CheckpointCoordinatorTestingUtils { return new Tuple2<>(allSerializedValuesConcatenated, offsets); } + public static void verifyStateRestore(ExecutionJobVertex executionJobVertex) throws Exception { + verifyStateRestore( + executionJobVertex.getJobVertexId(), + executionJobVertex, + StateAssignmentOperation.createKeyGroupPartitions( + executionJobVertex.getMaxParallelism(), + executionJobVertex.getParallelism())); + } + public static void verifyStateRestore( JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index e68c889..cab096e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -45,12 +45,12 @@ import static org.junit.Assert.assertTrue; public abstract class CompletedCheckpointStoreTest extends TestLogger { /** Creates the {@link CompletedCheckpointStore} implementation to be tested. */ - protected abstract CompletedCheckpointStore createCompletedCheckpoints( + protected abstract CompletedCheckpointStore createRecoveredCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception; - protected CompletedCheckpointStore createCompletedCheckpoints( + protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain) throws Exception { - return createCompletedCheckpoints( + return createRecoveredCompletedCheckpointStore( maxNumberOfCheckpointsToRetain, Executors.directExecutor()); } @@ -59,14 +59,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { /** Tests that at least one checkpoint needs to be retained. */ @Test(expected = Exception.class) public void testExceptionOnNoRetainedCheckpoints() throws Exception { - createCompletedCheckpoints(0); + createRecoveredCompletedCheckpointStore(0); } /** Tests adding and getting a checkpoint. */ @Test public void testAddAndGetLatestCheckpoint() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(4); // Empty state assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints()); @@ -95,7 +95,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { @Test public void testAddCheckpointMoreThanMaxRetained() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(1); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { @@ -131,7 +131,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { */ @Test public void testEmptyState() throws Exception { - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(1); assertNull(checkpoints.getLatestCheckpoint(false)); assertEquals(0, checkpoints.getAllCheckpoints().size()); @@ -142,7 +142,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { @Test public void testGetAllCheckpoints() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(4); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { @@ -169,7 +169,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { @Test public void testDiscardAllCheckpoints() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(4); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { @@ -201,7 +201,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { @Test public void testAcquireLatestCompletedCheckpointId() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(1); assertEquals(0, checkpoints.getLatestCheckpointId()); checkpoints.addCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java index bd0333c..8782aa4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.TestingStateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; /** Tests for {@link DefaultCompletedCheckpointStore}. */ @@ -69,7 +71,7 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { @Before public void setup() { - builder = TestingStateHandleStore.builder(); + builder = TestingStateHandleStore.newBuilder(); checkpointStorageHelper = new TestingRetrievableStateStorageHelper<>(); executorService = Executors.newFixedThreadPool(2, new ExecutorThreadFactory("IO-Executor")); } @@ -132,7 +134,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { /** * We have three completed checkpoints(1, 2, 3) in the state handle store. We expect that {@link - * DefaultCompletedCheckpointStore#recover()} should recover the sorted checkpoints by name. + * DefaultCompletedCheckpointStoreUtils#retrieveCompletedCheckpoints(StateHandleStore, + * CheckpointStoreUtil)} should recover the sorted checkpoints by name. */ @Test public void testRecoverSortedCheckpoints() throws Exception { @@ -140,9 +143,6 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { builder.setGetAllSupplier(() -> createStateHandles(3)).build(); final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - - completedCheckpointStore.recover(); - final List<CompletedCheckpoint> recoveredCompletedCheckpoint = completedCheckpointStore.getAllCheckpoints(); assertThat(recoveredCompletedCheckpoint.size(), is(3)); @@ -155,7 +155,7 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { /** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */ @Test - public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception { + public void testCorruptDataInStateHandleStoreShouldNotBeSkipped() throws Exception { final long corruptCkpId = 2L; checkpointStorageHelper.setRetrieveStateFunction( state -> { @@ -167,11 +167,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = builder.setGetAllSupplier(() -> createStateHandles(3)).build(); - final CompletedCheckpointStore completedCheckpointStore = - createCompletedCheckpointStore(stateHandleStore); - try { - completedCheckpointStore.recover(); + createCompletedCheckpointStore(stateHandleStore); } catch (Exception e) { if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) { return; @@ -196,7 +193,6 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(num)); assertThat(completedCheckpointStore.getAllCheckpoints().get(0).getCheckpointID(), is(1L)); @@ -229,7 +225,6 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(num)); assertThat(completedCheckpointStore.getAllCheckpoints().get(0).getCheckpointID(), is(1L)); @@ -266,7 +261,6 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(num)); completedCheckpointStore.shutdown(JobStatus.CANCELED, new CheckpointsCleaner()); @@ -294,7 +288,6 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(3)); completedCheckpointStore.shutdown(JobStatus.CANCELLING, new CheckpointsCleaner()); @@ -310,6 +303,26 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(0)); } + @Test + public void testShutdownFailsAnyFutureCallsToAddCheckpoint() throws Exception { + final CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + for (JobStatus status : JobStatus.values()) { + final CompletedCheckpointStore completedCheckpointStore = + createCompletedCheckpointStore(builder.build()); + completedCheckpointStore.shutdown(status, checkpointsCleaner); + assertThrows( + IllegalStateException.class, + () -> + completedCheckpointStore.addCheckpoint( + CompletedCheckpointStoreTest.createCheckpoint( + 0L, new SharedStateRegistry()), + checkpointsCleaner, + () -> { + // No-op. + })); + } + } + private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles( int num) { final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandles = @@ -325,16 +338,16 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { } private CompletedCheckpointStore createCompletedCheckpointStore( - TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) { + TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) throws Exception { return createCompletedCheckpointStore(stateHandleStore, 1); } private CompletedCheckpointStore createCompletedCheckpointStore( - TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) { - return new DefaultCompletedCheckpointStore<>( - toRetain, - stateHandleStore, + TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) + throws Exception { + final CheckpointStoreUtil checkpointStoreUtil = new CheckpointStoreUtil() { + @Override public String checkpointIDToName(long checkpointId) { return String.valueOf(checkpointId); @@ -342,9 +355,15 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { @Override public long nameToCheckpointID(String name) { - return Long.valueOf(name); + return Long.parseLong(name); } - }, + }; + return new DefaultCompletedCheckpointStore<>( + toRetain, + stateHandleStore, + checkpointStoreUtil, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + stateHandleStore, checkpointStoreUtil), executorService); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java new file mode 100644 index 0000000..984f105 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.persistence.TestingStateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +/** Tests related to {@link DefaultCompletedCheckpointStoreUtils}. */ +public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger { + + private static CompletedCheckpoint createCompletedCheckpoint(long checkpointId) { + return new CompletedCheckpoint( + new JobID(), + checkpointId, + 0, + 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + new TestCompletedCheckpointStorageLocation()); + } + + private static class FailingRetrievableStateHandle<T extends Serializable> + implements RetrievableStateHandle<T> { + + private static final int serialVersionUID = 1; + + @Override + public T retrieveState() throws IOException, ClassNotFoundException { + throw new IOException("Test exception."); + } + + @Override + public void discardState() throws Exception { + // No-op. + } + + @Override + public long getStateSize() { + return 0; + } + } + + private static class SimpleCheckpointStoreUtil implements CheckpointStoreUtil { + + @Override + public String checkpointIDToName(long checkpointId) { + return "checkpoint-" + checkpointId; + } + + @Override + public long nameToCheckpointID(String name) { + return Long.parseLong(name.split("-")[1]); + } + } + + @Test + public void testRetrievedCheckpointsAreOrderedChronologically() throws Exception { + final TestingRetrievableStateStorageHelper<CompletedCheckpoint> storageHelper = + new TestingRetrievableStateStorageHelper<>(); + final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> handles = + new ArrayList<>(); + handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(0L)), "checkpoint-0")); + handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(1L)), "checkpoint-1")); + handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(2L)), "checkpoint-2")); + Collections.shuffle(handles); + final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = + TestingStateHandleStore.<CompletedCheckpoint>newBuilder() + .setGetAllSupplier(() -> handles) + .build(); + final Collection<CompletedCheckpoint> completedCheckpoints = + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + stateHandleStore, new SimpleCheckpointStoreUtil()); + // Make sure checkpoints are ordered from earliest to latest. + assertEquals( + Arrays.asList(0L, 1L, 2L), + completedCheckpoints.stream() + .map(CompletedCheckpoint::getCheckpointID) + .collect(Collectors.toList())); + } + + @Test + public void testRetrievingCheckpointsFailsIfRetrievalOfAnyCheckpointFails() throws Exception { + final TestingRetrievableStateStorageHelper<CompletedCheckpoint> storageHelper = + new TestingRetrievableStateStorageHelper<>(); + final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> handles = + new ArrayList<>(); + handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(0L)), "checkpoint-0")); + handles.add(Tuple2.of(new FailingRetrievableStateHandle<>(), "checkpoint-1")); + handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(2L)), "checkpoint-2")); + Collections.shuffle(handles); + final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = + TestingStateHandleStore.<CompletedCheckpoint>newBuilder() + .setGetAllSupplier(() -> handles) + .build(); + assertThrows( + FlinkException.class, + () -> + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + stateHandleStore, new SimpleCheckpointStoreUtil())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java new file mode 100644 index 0000000..6b65d00 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +/** Tests related to {@link PerJobCheckpointRecoveryFactory}. */ +public class PerJobCheckpointRecoveryTest extends TestLogger { + + @Test + public void testFactoryWithoutCheckpointStoreRecovery() throws Exception { + final TestingCompletedCheckpointStore store = + new TestingCompletedCheckpointStore(new CompletableFuture<>()); + final CheckpointRecoveryFactory factory = + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> store); + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + final JobID firstJobId = new JobID(); + assertSame( + store, factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + assertThrows( + UnsupportedOperationException.class, + () -> factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + + final JobID secondJobId = new JobID(); + assertSame( + store, + factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + assertThrows( + UnsupportedOperationException.class, + () -> factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 4f4e2a7..218b023 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -41,9 +41,8 @@ import static org.junit.Assert.assertTrue; public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest { @Override - protected CompletedCheckpointStore createCompletedCheckpoints( + protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception { - return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); } @@ -51,7 +50,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS @Test public void testShutdownDiscardsCheckpoints() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore store = createCompletedCheckpoints(1); + CompletedCheckpointStore store = createRecoveredCompletedCheckpointStore(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); Collection<OperatorState> operatorStates = checkpoint.getOperatorStates().values(); @@ -72,7 +71,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS @Test public void testSuspendDiscardsCheckpoints() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore store = createCompletedCheckpoints(1); + CompletedCheckpointStore store = createRecoveredCompletedCheckpointStore(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); Collection<OperatorState> taskStates = checkpoint.getOperatorStates().values(); @@ -95,7 +94,8 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS final int numCheckpointsToRetain = 1; CompletedCheckpointStore store = - createCompletedCheckpoints(numCheckpointsToRetain, Executors.directExecutor()); + createRecoveredCompletedCheckpointStore( + numCheckpointsToRetain, Executors.directExecutor()); CountDownLatch discardAttempted = new CountDownLatch(1); for (long i = 0; i < numCheckpointsToRetain + 1; ++i) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java index cda32d6..f4e9256 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java @@ -32,7 +32,7 @@ public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFacto } @Override - public CompletedCheckpointStore createCheckpointStore( + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { return store; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java index 9e95141..629dad1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java @@ -33,9 +33,6 @@ public final class TestingCompletedCheckpointStore implements CompletedCheckpoin } @Override - public void recover() {} - - @Override public void addCheckpoint( CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index ddb8ce4..d909174 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -66,9 +66,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint @AfterClass public static void tearDown() throws Exception { - if (ZOOKEEPER != null) { - ZOOKEEPER.shutdown(); - } + ZOOKEEPER.shutdown(); } @Before @@ -77,18 +75,19 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint } @Override - protected CompletedCheckpointStore createCompletedCheckpoints( + protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception { final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore( ZOOKEEPER.getClient(), CHECKPOINT_PATH, new TestingRetrievableStateStorageHelper<>()); - return new DefaultCompletedCheckpointStore<>( maxNumberOfCheckpointsToRetain, checkpointsInZooKeeper, checkpointStoreUtil, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + checkpointsInZooKeeper, checkpointStoreUtil), executor); } @@ -103,7 +102,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint public void testRecover() throws Exception { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3); + CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(3); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { @@ -128,7 +127,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // Recover sharedStateRegistry.close(); sharedStateRegistry = new SharedStateRegistry(); - checkpoints.recover(); assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); @@ -157,7 +155,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint CuratorFramework client = ZOOKEEPER.getClient(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore store = createCompletedCheckpoints(1); + CompletedCheckpointStore store = createRecoveredCompletedCheckpointStore(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); store.addCheckpoint(checkpoint, new CheckpointsCleaner(), () -> {}); @@ -179,9 +177,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint checkpoint.getCheckpointID()))); sharedStateRegistry.close(); - store.recover(); - assertEquals(0, store.getNumberOfRetainedCheckpoints()); + assertEquals( + 0, createRecoveredCompletedCheckpointStore(1).getNumberOfRetainedCheckpoints()); } /** @@ -193,7 +191,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint CuratorFramework client = ZOOKEEPER.getClient(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore store = createCompletedCheckpoints(1); + CompletedCheckpointStore store = createRecoveredCompletedCheckpointStore(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); store.addCheckpoint(checkpoint, new CheckpointsCleaner(), () -> {}); @@ -219,7 +217,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // Recover again sharedStateRegistry.close(); - store.recover(); + store = createRecoveredCompletedCheckpointStore(1); CompletedCheckpoint recovered = store.getLatestCheckpoint(false); assertEquals(checkpoint, recovered); @@ -234,7 +232,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint public void testLatestCheckpointRecovery() throws Exception { final int numCheckpoints = 3; SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - CompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints); + CompletedCheckpointStore checkpointStore = + createRecoveredCompletedCheckpointStore(numCheckpoints); List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints); checkpoints.add(createCheckpoint(9, sharedStateRegistry)); @@ -246,10 +245,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint } sharedStateRegistry.close(); - checkpointStore.recover(); - - CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(false); + final CompletedCheckpoint latestCheckpoint = + createRecoveredCompletedCheckpointStore(numCheckpoints).getLatestCheckpoint(false); assertEquals(checkpoints.get(checkpoints.size() - 1), latestCheckpoint); } @@ -264,10 +262,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint final int numberOfCheckpoints = 1; final long waitingTimeout = 50L; - CompletedCheckpointStore zkCheckpointStore1 = - createCompletedCheckpoints(numberOfCheckpoints); - CompletedCheckpointStore zkCheckpointStore2 = - createCompletedCheckpoints(numberOfCheckpoints); + final CompletedCheckpointStore zkCheckpointStore1 = + createRecoveredCompletedCheckpointStore(numberOfCheckpoints); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); @@ -279,7 +275,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // recover the checkpoint by a different checkpoint store sharedStateRegistry.close(); sharedStateRegistry = new SharedStateRegistry(); - zkCheckpointStore2.recover(); + final CompletedCheckpointStore zkCheckpointStore2 = + createRecoveredCompletedCheckpointStore(numberOfCheckpoints); CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(false); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); @@ -339,7 +336,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint final int maxCheckpointsToRetain = 1; ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor(); CompletedCheckpointStore checkpointStore = - createCompletedCheckpoints(maxCheckpointsToRetain, executor); + createRecoveredCompletedCheckpointStore(maxCheckpointsToRetain, executor); int nbCheckpointsToInject = 3; for (int i = 1; i <= nbCheckpointsToInject; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index 2196473..222f82b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -170,10 +170,10 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { numCheckpointsToRetain, zooKeeperStateHandleStoreMock, zooKeeperCheckpointStoreUtil, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + zooKeeperStateHandleStoreMock, zooKeeperCheckpointStoreUtil), Executors.directExecutor()); - zooKeeperCompletedCheckpointStore.recover(); - CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false); @@ -306,10 +306,10 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { numCheckpointsToRetain, zooKeeperStateHandleStoreMock, zooKeeperCheckpointStoreUtil, + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + zooKeeperStateHandleStoreMock, zooKeeperCheckpointStoreUtil), Executors.directExecutor()); - zooKeeperCompletedCheckpointStore.recover(); - CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index ea0a3fc..1001b52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -30,10 +31,8 @@ import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLo import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; -import org.apache.flink.util.function.TriConsumer; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; @@ -48,14 +47,11 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.Function; -import java.util.stream.IntStream; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; -import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint; -import static org.apache.flink.util.ExceptionUtils.findThrowable; -import static org.apache.flink.util.ExceptionUtils.rethrow; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; /** Tests for {@link DefaultCompletedCheckpointStore} with {@link ZooKeeperStateHandleStore}. */ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { @@ -74,102 +70,14 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { assertEquals(checkpointId, zooKeeperCheckpointStoreUtil.nameToCheckpointID(path)); } - @Test(expected = ExpectedTestException.class) - public void testRecoverFailsIfDownloadFails() throws Exception { - testDownloadInternal( - (store, checkpointsInZk, sharedStateRegistry) -> { - try { - checkpointsInZk.add( - createHandle( - 1, - id -> { - throw new ExpectedTestException(); - })); - store.recover(); - } catch (Exception exception) { - findThrowable(exception, ExpectedTestException.class) - .ifPresent(ExceptionUtils::rethrow); - rethrow(exception); - } - }); - } - @Test - public void testNoDownloadIfCheckpointsNotChanged() throws Exception { - testDownloadInternal( - (store, checkpointsInZk, sharedStateRegistry) -> { - try { - checkpointsInZk.add( - createHandle( - 1, - id -> { - throw new AssertionError( - "retrieveState was attempted for checkpoint " - + id); - })); - store.addCheckpoint( - createCheckpoint(1, sharedStateRegistry), - new CheckpointsCleaner(), - () -> { - /*no op*/ - }); - store.recover(); // will fail in case of attempt to retrieve state - } catch (Exception exception) { - throw new RuntimeException(exception); - } - }); - } - - @Test - public void testDownloadIfCheckpointsChanged() throws Exception { - testDownloadInternal( - (store, checkpointsInZk, sharedStateRegistry) -> { - try { - int lastInZk = 10; - IntStream.range(0, lastInZk + 1) - .forEach( - i -> - checkpointsInZk.add( - createHandle( - i, - id -> - createCheckpoint( - id, - sharedStateRegistry)))); - store.addCheckpoint( - createCheckpoint(1, sharedStateRegistry), - new CheckpointsCleaner(), - () -> { - /*no op*/ - }); - store.addCheckpoint( - createCheckpoint(5, sharedStateRegistry), - new CheckpointsCleaner(), - () -> { - /*no op*/ - }); - store.recover(); - assertEquals(lastInZk, store.getLatestCheckpoint(false).getCheckpointID()); - } catch (Exception exception) { - throw new RuntimeException(exception); - } - }); - } - - private void testDownloadInternal( - TriConsumer< - CompletedCheckpointStore, - List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, - SharedStateRegistry> - test) - throws Exception { - SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - Configuration configuration = new Configuration(); + public void testRecoverFailsIfDownloadFails() { + final Configuration configuration = new Configuration(); configuration.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); - List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZk = + final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZk = new ArrayList<>(); - ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = + final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = new ZooKeeperStateHandleStore<CompletedCheckpoint>( ZooKeeperUtils.startCuratorFramework(configuration), new TestingRetrievableStateStorageHelper<>()) { @@ -180,18 +88,19 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { } }; - CompletedCheckpointStore store = - new DefaultCompletedCheckpointStore<>( - 10, - checkpointsInZooKeeper, - zooKeeperCheckpointStoreUtil, - Executors.directExecutor()); - try { - test.accept(store, checkpointsInZk, sharedStateRegistry); - } finally { - store.shutdown(JobStatus.FINISHED, new CheckpointsCleaner()); - sharedStateRegistry.close(); - } + checkpointsInZk.add( + createHandle( + 1, + id -> { + throw new ExpectedTestException(); + })); + final Exception exception = + assertThrows( + Exception.class, + () -> + DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( + checkpointsInZooKeeper, zooKeeperCheckpointStoreUtil)); + assertThat(exception, FlinkMatchers.containsCause(ExpectedTestException.class)); } private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle( @@ -269,11 +178,11 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore( client, "/checkpoints", new TestingRetrievableStateStorageHelper<>()); - return new DefaultCompletedCheckpointStore<>( 1, checkpointsInZooKeeper, zooKeeperCheckpointStoreUtil, + Collections.emptyList(), Executors.directExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java index 1b92af0..f4377a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java @@ -19,11 +19,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry; @@ -58,6 +56,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */ @@ -68,11 +67,18 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { @Before public void setUp() throws Exception { super.setUp(); - final CompletedCheckpointStore completedCheckpointStore = - new EmbeddedCompletedCheckpointStore(); haServices.setCheckpointRecoveryFactory( - PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( - completedCheckpointStore, new StandaloneCheckpointIDCounter())); + new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>( + (maxCheckpoints, previous) -> { + if (previous != null) { + // First job attempt failed before cleaning up the checkpoint store. + assertFalse(previous.getShutdownStatus().isPresent()); + assertFalse(previous.getAllCheckpoints().isEmpty()); + return new EmbeddedCompletedCheckpointStore( + maxCheckpoints, previous.getAllCheckpoints()); + } + return new EmbeddedCompletedCheckpointStore(maxCheckpoints); + })); } @After diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java index 8438312..568e937 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java @@ -68,7 +68,7 @@ public class DefaultJobGraphStoreTest extends TestLogger { @Before public void setup() { - builder = TestingStateHandleStore.builder(); + builder = TestingStateHandleStore.newBuilder(); testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher(); testingJobGraphListener = new TestingJobGraphListener(); jobGraphStorageHelper = new TestingRetrievableStateStorageHelper<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java index 6586fae..469f290 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java @@ -51,7 +51,7 @@ public class JobExecutionITCase extends TestLogger { final JobGraph jobGraph = createJobGraph(parallelism); final TestingMiniClusterConfiguration miniClusterConfiguration = - new TestingMiniClusterConfiguration.Builder() + TestingMiniClusterConfiguration.newBuilder() .setNumSlotsPerTaskManager(numSlotsPerTaskExecutor) .setNumTaskManagers(numTaskExecutors) .setLocalCommunication(true) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 3f30d51..455e338 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -159,7 +159,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -798,8 +797,8 @@ public class JobMasterTest extends TestLogger { final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); final CheckpointRecoveryFactory testingCheckpointRecoveryFactory = - useSameServicesForAllJobs( - completedCheckpointStore, new StandaloneCheckpointIDCounter()); + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> completedCheckpointStore); haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); final JobMaster jobMaster = @@ -873,8 +872,8 @@ public class JobMasterTest extends TestLogger { completedCheckpointStore.addCheckpoint( completedCheckpoint, new CheckpointsCleaner(), () -> {}); final CheckpointRecoveryFactory testingCheckpointRecoveryFactory = - useSameServicesForAllJobs( - completedCheckpointStore, new StandaloneCheckpointIDCounter()); + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> completedCheckpointStore); haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java index e8673e4..e8903b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java @@ -81,7 +81,7 @@ public class LeaderChangeClusterComponentsTest extends TestLogger { miniCluster = new TestingMiniCluster( - new TestingMiniClusterConfiguration.Builder() + TestingMiniClusterConfiguration.newBuilder() .setNumTaskManagers(NUM_TMS) .setNumSlotsPerTaskManager(SLOTS_PER_TM) .build(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java index ac7c46b..a9b4569 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java @@ -30,6 +30,15 @@ import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED; /** Configuration for the {@link TestingMiniCluster}. */ public class TestingMiniClusterConfiguration extends MiniClusterConfiguration { + /** + * Create a new {@link Builder builder} for {@link TestingMiniClusterConfiguration}. + * + * @return New builder instance. + */ + public static Builder newBuilder() { + return new Builder(); + } + private final int numberDispatcherResourceManagerComponents; private final boolean localCommunication; @@ -70,8 +79,12 @@ public class TestingMiniClusterConfiguration extends MiniClusterConfiguration { @Nullable private String commonBindAddress = null; - public Builder setConfiguration(Configuration configuration1) { - this.configuration = Preconditions.checkNotNull(configuration1); + private Builder() { + // No-op. + } + + public Builder setConfiguration(Configuration configuration) { + this.configuration = Preconditions.checkNotNull(configuration); return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java index 10c055e..7ce92b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java @@ -141,7 +141,7 @@ public class TestingStateHandleStore<T extends Serializable> releaseAllHandlesRunnable.run(); } - public static <T extends Serializable> Builder<T> builder() { + public static <T extends Serializable> Builder<T> newBuilder() { return new Builder<>(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java index 7a93108..be7b45a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java @@ -19,18 +19,15 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -38,22 +35,18 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.SerializableSupplier; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; -import static org.apache.flink.configuration.JobManagerOptions.SchedulerType.Adaptive; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assume.assumeFalse; +import static org.junit.Assert.assertEquals; /** * Test that failure on recovery leads to job restart if configured, so that transient recovery @@ -63,7 +56,9 @@ public class CheckpointStoreITCase extends TestLogger { private static final Configuration CONFIGURATION = new Configuration() - .set(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName()); + .set( + HighAvailabilityOptions.HA_MODE, + BlockingHighAvailabilityServiceFactory.class.getName()); @ClassRule public static final MiniClusterWithClientResource CLUSTER = @@ -73,33 +68,41 @@ public class CheckpointStoreITCase extends TestLogger { .build()); @Before - public void init() { - FailingStore.reset(); + public void setUp() { + BlockingHighAvailabilityServiceFactory.reset(); FailingMapper.reset(); } @Test - public void testRestartOnRecoveryFailure() throws Exception { - assumeFalse( - // TODO: remove after FLINK-22483 - "Adaptive scheduler doesn't retry after failures on recovery", - ClusterOptions.getSchedulerType(CONFIGURATION) == Adaptive); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + public void testJobClientRemainsResponsiveDuringCompletedCheckpointStoreRecovery() + throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10); env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0)); - env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed)) + env.addSource(emitUntil(() -> FailingMapper.failedAndProcessed)) .map(new FailingMapper()) .addSink(new DiscardingSink<>()); - env.execute(); + final JobClient jobClient = env.executeAsync(); - checkState(FailingStore.recovered && FailingMapper.failedAndProcessed); + BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsStart.await(); + for (int i = 0; i < 10; i++) { + final JobStatus jobStatus = jobClient.getJobStatus().get(); + assertEquals(JobStatus.INITIALIZING, jobStatus); + } + BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsFinished.countDown(); + + // Await for job to finish. + jobClient.getJobExecutionResult().get(); + + checkState(FailingMapper.failedAndProcessed); } private static class FailingMapper implements MapFunction<Integer, Integer> { + private static volatile boolean failed = false; private static volatile boolean failedAndProcessed = false; - public static void reset() { + static void reset() { failed = false; failedAndProcessed = false; } @@ -116,78 +119,43 @@ public class CheckpointStoreITCase extends TestLogger { } } - /** TestingHAFactory. */ - public static class TestingHAFactory implements HighAvailabilityServicesFactory { - - @Override - public HighAvailabilityServices createHAServices( - Configuration configuration, Executor executor) { - return new EmbeddedHaServices(Executors.directExecutor()) { - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { - return new TestingCheckpointRecoveryFactory( - new FailingStore(), - new TestingCheckpointIDCounter(new CompletableFuture<>())); - } - }; - } - } + /** + * Testing implementation of {@link HighAvailabilityServicesFactory} that lets us inject custom + * {@link HighAvailabilityServices}. + */ + public static class BlockingHighAvailabilityServiceFactory + implements HighAvailabilityServicesFactory { - private static class FailingStore implements CompletedCheckpointStore { - private static volatile boolean started = false; - private static volatile boolean failed = false; - private static volatile boolean recovered = false; + private static volatile CountDownLatch fetchRemoteCheckpointsStart = new CountDownLatch(1); + private static volatile CountDownLatch fetchRemoteCheckpointsFinished = + new CountDownLatch(1); - public static void reset() { - started = failed = recovered = false; + static void reset() { + fetchRemoteCheckpointsStart = new CountDownLatch(1); + fetchRemoteCheckpointsFinished = new CountDownLatch(1); } @Override - public void recover() throws Exception { - if (!started) { - started = true; - } else if (!failed) { - failed = true; - throw new RuntimeException(); - } else if (!recovered) { - recovered = true; - } - } - - @Override - public void addCheckpoint( - CompletedCheckpoint checkpoint, - CheckpointsCleaner checkpointsCleaner, - Runnable postCleanup) {} - - @Override - public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) - throws Exception {} - - @Override - public List<CompletedCheckpoint> getAllCheckpoints() { - return Collections.emptyList(); - } - - @Override - public int getNumberOfRetainedCheckpoints() { - return 0; - } - - @Override - public int getMaxNumberOfRetainedCheckpoints() { - return 1; - } - - @Override - public boolean requiresExternalizedCheckpoints() { - return false; + public HighAvailabilityServices createHAServices( + Configuration configuration, Executor executor) { + final CheckpointRecoveryFactory checkpointRecoveryFactory = + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> { + fetchRemoteCheckpointsStart.countDown(); + try { + fetchRemoteCheckpointsFinished.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new StandaloneCompletedCheckpointStore(maxCheckpoints); + }); + return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory); } } private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> until) { return new SourceFunction<Integer>() { + private volatile boolean running = true; @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java index b367042..f32ee15 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java @@ -39,12 +39,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.BackendBuildingException; @@ -417,20 +416,6 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { } } - private static class TestingHaServices extends EmbeddedHaServices { - private final CheckpointRecoveryFactory checkpointRecoveryFactory; - - TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) { - super(executor); - this.checkpointRecoveryFactory = checkpointRecoveryFactory; - } - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { - return checkpointRecoveryFactory; - } - } - /** An extension of {@link StandaloneCompletedCheckpointStore}. */ private static class TestingCompletedCheckpointStore extends StandaloneCompletedCheckpointStore { @@ -471,11 +456,10 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { @Override public HighAvailabilityServices createHAServices( Configuration configuration, Executor executor) { - return new TestingHaServices( - PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( - new TestingCompletedCheckpointStore(), - new StandaloneCheckpointIDCounter()), - executor); + final CheckpointRecoveryFactory checkpointRecoveryFactory = + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> new TestingCompletedCheckpointStore()); + return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 6cd7ada..cce91ed 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -35,11 +35,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -440,20 +439,6 @@ public class RegionFailoverITCase extends TestLogger { private static final long serialVersionUID = 1L; } - private static class TestingHaServices extends EmbeddedHaServices { - private final CheckpointRecoveryFactory checkpointRecoveryFactory; - - TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) { - super(executor); - this.checkpointRecoveryFactory = checkpointRecoveryFactory; - } - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { - return checkpointRecoveryFactory; - } - } - /** * An extension of {@link StandaloneCompletedCheckpointStore} which would record information of * last completed checkpoint id and the number of completed checkpoints. @@ -486,11 +471,10 @@ public class RegionFailoverITCase extends TestLogger { @Override public HighAvailabilityServices createHAServices( Configuration configuration, Executor executor) { - return new TestingHaServices( - PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( - new TestingCompletedCheckpointStore(), - new StandaloneCheckpointIDCounter()), - executor); + final CheckpointRecoveryFactory checkpointRecoveryFactory = + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> new TestingCompletedCheckpointStore()); + return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index fa491dc..85c3f21 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -45,13 +45,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -763,20 +762,6 @@ public class SavepointITCase extends TestLogger { } } - private static class TestingHaServices extends EmbeddedHaServices { - private final CheckpointRecoveryFactory checkpointRecoveryFactory; - - TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) { - super(executor); - this.checkpointRecoveryFactory = checkpointRecoveryFactory; - } - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { - return checkpointRecoveryFactory; - } - } - /** * A factory for HA services used to inject {@link * FailingSyncSavepointCompletedCheckpointStore}. @@ -784,12 +769,11 @@ public class SavepointITCase extends TestLogger { public static class FailingSyncSavepointHAFactory implements HighAvailabilityServicesFactory { @Override public HighAvailabilityServices createHAServices( - Configuration configuration, Executor executor) throws Exception { - return new TestingHaServices( - PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( - new FailingSyncSavepointCompletedCheckpointStore(), - new StandaloneCheckpointIDCounter()), - executor); + Configuration configuration, Executor executor) { + final CheckpointRecoveryFactory checkpointRecoveryFactory = + PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( + maxCheckpoints -> new FailingSyncSavepointCompletedCheckpointStore()); + return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 5e842bc..a412498 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -98,7 +98,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L); final TestingMiniClusterConfiguration miniClusterConfiguration = - new TestingMiniClusterConfiguration.Builder() + TestingMiniClusterConfiguration.newBuilder() .setConfiguration(configuration) .setNumberDispatcherResourceManagerComponents(numDispatchers) .setNumTaskManagers(numTMs)