This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c0936deaf99 [FLINK-26985][runtime] Don't discard shared state of restored checkpoints c0936deaf99 is described below commit c0936deaf99390fc727acc8633e3be22e62f4bf5 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Fri Apr 1 16:08:32 2022 +0200 [FLINK-26985][runtime] Don't discard shared state of restored checkpoints Currently, in LEGACY restore mode, shared state of incremental checkpoints can be discarded regardless of whether they were created by this job or not. This invalidates the checkpoint from which the job was restored. The bug was introduced in FLINK-24611. Before that, reference count was maintained for each shared state entry; "initial" checkpoints did not decrement this count, preventing their shared state from being discarded. This change makes SharedStateRegistry to: 1. Remember the max checkpiont ID encountered during recovery 2. Associate each shared state entry with a checkpoint ID that created it 3. Only discard the entry if its createdByCheckpointID > highestRetainCheckpointID (1) is called from: - CheckpointCoordinator.restoreSavepoint - to cover initial restore from a checkpoint - SharedStateFactory, when building checkpoint store - to cover the failover case (see DefaultExecutionGraphFactory.createAndRestoreExecutionGraph) Adjusting only the CheckpointCoordinator path isn't sufficient: - job recovers from an existing checkpoints, adds it to the store - a new checkpoint is created - with the default restore settings - a failure happens, job recovers from a newer checkpoint - when a newer checkpoint is subsumed, its (inherited) shared state might be deleted --- .../KubernetesCheckpointRecoveryFactory.java | 7 +- .../flink/kubernetes/utils/KubernetesUtils.java | 7 +- .../runtime/checkpoint/CheckpointCoordinator.java | 3 +- .../checkpoint/CheckpointRecoveryFactory.java | 5 +- .../runtime/checkpoint/CompletedCheckpoint.java | 7 +- .../EmbeddedCompletedCheckpointStore.java | 13 +- .../PerJobCheckpointRecoveryFactory.java | 12 +- .../StandaloneCheckpointRecoveryFactory.java | 9 +- .../StandaloneCompletedCheckpointStore.java | 17 +- .../ZooKeeperCheckpointRecoveryFactory.java | 7 +- .../cleanup/CheckpointResourcesCleanupRunner.java | 7 +- .../EmbeddedHaServicesWithLeadershipControl.java | 9 +- .../apache/flink/runtime/jobgraph/RestoreMode.java | 2 + .../runtime/jobgraph/SavepointConfigOptions.java | 2 +- .../flink/runtime/scheduler/SchedulerUtils.java | 14 +- .../flink/runtime/state/SharedStateRegistry.java | 20 +- .../runtime/state/SharedStateRegistryFactory.java | 8 +- .../runtime/state/SharedStateRegistryImpl.java | 28 ++- .../apache/flink/runtime/util/ZooKeeperUtils.java | 8 +- .../CheckpointCoordinatorFailureTest.java | 3 +- .../CheckpointCoordinatorRestoringTest.java | 6 +- .../checkpoint/CheckpointCoordinatorTest.java | 262 +++++++++++---------- .../checkpoint/CompletedCheckpointTest.java | 5 +- .../DefaultCompletedCheckpointStoreTest.java | 5 +- .../checkpoint/PerJobCheckpointRecoveryTest.java | 13 +- .../TestingCheckpointRecoveryFactory.java | 4 +- .../ZooKeeperCompletedCheckpointStoreITCase.java | 4 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 4 +- .../dispatcher/DispatcherCleanupITCase.java | 15 +- .../CheckpointResourcesCleanupRunnerTest.java | 4 +- .../runtime/scheduler/SchedulerUtilsTest.java | 13 +- .../flink/runtime/testutils/CommonTestUtils.java | 33 ++- .../ResumeCheckpointManuallyITCase.java | 119 +++++++--- .../test/state/ChangelogCompatibilityITCase.java | 2 +- .../flink/test/state/ChangelogRescalingITCase.java | 2 +- 35 files changed, 452 insertions(+), 227 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java index ea78ecbbd28..7150034bbb8 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import javax.annotation.Nullable; @@ -81,7 +82,8 @@ public class KubernetesCheckpointRecoveryFactory implements CheckpointRecoveryFa JobID jobID, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception { final String configMapName = getConfigMapNameFunction.apply(jobID); KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, configMapName, clusterId); @@ -94,7 +96,8 @@ public class KubernetesCheckpointRecoveryFactory implements CheckpointRecoveryFa lockIdentity, maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, - ioExecutor); + ioExecutor, + restoreMode); } @Override diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index e02d3a824e2..f8afbafde0a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore; import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.jobmanager.NoOpJobGraphStoreWatcher; @@ -296,6 +297,7 @@ public class KubernetesUtils { * @param lockIdentity lock identity to check the leadership * @param maxNumberOfCheckpointsToRetain max number of checkpoints to retain on state store * handle + * @param restoreMode the mode in which the job is restoring * @return a {@link DefaultCompletedCheckpointStore} with {@link KubernetesStateHandleStore}. * @throws Exception when create the storage helper failed */ @@ -307,7 +309,8 @@ public class KubernetesUtils { @Nullable String lockIdentity, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception { final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = @@ -331,7 +334,7 @@ public class KubernetesUtils { stateHandleStore, KubernetesCheckpointStoreUtil.INSTANCE, checkpoints, - sharedStateRegistryFactory.create(ioExecutor, checkpoints), + sharedStateRegistryFactory.create(ioExecutor, checkpoints, restoreMode), executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6fd6ad19fea..2d47d79f063 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1789,7 +1789,8 @@ public class CheckpointCoordinator { // register shared state - even before adding the checkpoint to the store // because the latter might trigger subsumption so the ref counts must be up-to-date savepoint.registerSharedStatesAfterRestored( - completedCheckpointStore.getSharedStateRegistry()); + completedCheckpointStore.getSharedStateRegistry(), + restoreSettings.getRestoreMode()); completedCheckpointStore.addCheckpointAndSubsumeOldestOne( savepoint, checkpointsCleaner, this::scheduleTriggerRequest); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java index ab0b5ef8506..64c68caa8a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; @@ -37,13 +38,15 @@ public interface CheckpointRecoveryFactory { * @param sharedStateRegistryFactory Simple factory to produce {@link SharedStateRegistry} * objects. * @param ioExecutor Executor used to run (async) deletes. + * @param restoreMode the restore mode with which the job is restoring. * @return {@link CompletedCheckpointStore} instance for the job */ CompletedCheckpointStore createRecoveredCompletedCheckpointStore( JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 41c577452bb..f0270ab661a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateUtil; @@ -207,11 +208,13 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { * checkpoint is added into the store. * * @param sharedStateRegistry The registry where shared states are registered + * @param restoreMode the mode in which this checkpoint was restored from */ - public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) { + public void registerSharedStatesAfterRestored( + SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) { // in claim mode we should not register any shared handles if (!props.isUnclaimed()) { - sharedStateRegistry.registerAll(operatorStates.values(), checkpointID); + sharedStateRegistry.registerAllAfterRestored(this, restoreMode); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java index 1e5e47c8855..744083ca6af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.Executors; @@ -53,16 +54,22 @@ public class EmbeddedCompletedCheckpointStore extends AbstractCompleteCheckpoint @VisibleForTesting public EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) { - this(maxRetainedCheckpoints, Collections.emptyList()); + this( + maxRetainedCheckpoints, + Collections.emptyList(), + /* Using the default restore mode in tests to detect any breaking changes early. */ + RestoreMode.DEFAULT); } public EmbeddedCompletedCheckpointStore( - int maxRetainedCheckpoints, Collection<CompletedCheckpoint> initialCheckpoints) { + int maxRetainedCheckpoints, + Collection<CompletedCheckpoint> initialCheckpoints, + RestoreMode restoreMode) { this( maxRetainedCheckpoints, initialCheckpoints, SharedStateRegistry.DEFAULT_FACTORY.create( - Executors.directExecutor(), initialCheckpoints)); + Executors.directExecutor(), initialCheckpoints, restoreMode)); } public EmbeddedCompletedCheckpointStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java index bc18c453969..7a70f5624a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import javax.annotation.Nullable; @@ -42,7 +43,7 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore> public static <T extends CompletedCheckpointStore> CheckpointRecoveryFactory withoutCheckpointStoreRecovery(IntFunction<T> storeFn) { return new PerJobCheckpointRecoveryFactory<>( - (maxCheckpoints, previous, sharedStateRegistry, ioExecutor) -> { + (maxCheckpoints, previous, sharedStateRegistry, ioExecutor, restoreMode) -> { if (previous != null) { throw new UnsupportedOperationException( "Checkpoint store recovery is not supported."); @@ -75,7 +76,8 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore> JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) { + Executor ioExecutor, + RestoreMode restoreMode) { return store.compute( jobId, (key, previous) -> @@ -83,7 +85,8 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore> maxNumberOfCheckpointsToRetain, previous, sharedStateRegistryFactory, - ioExecutor)); + ioExecutor, + restoreMode)); } @Override @@ -98,6 +101,7 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore> int maxNumberOfCheckpointsToRetain, @Nullable StoreType previousStore, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor); + Executor ioExecutor, + RestoreMode restoreMode); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index 95f9da72406..abcb704ad7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.SharedStateRegistryFactory; @@ -32,11 +33,15 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception { return new StandaloneCompletedCheckpointStore( - maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, ioExecutor); + maxNumberOfCheckpointsToRetain, + sharedStateRegistryFactory, + ioExecutor, + restoreMode); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 87a6486a911..6c89dcc7127 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; @@ -56,32 +57,38 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompleteCheckpoi this( maxNumberOfCheckpointsToRetain, SharedStateRegistry.DEFAULT_FACTORY, - Executors.directExecutor()); + Executors.directExecutor(), + /* Using the default restore mode in tests to detect any breaking changes early. */ + RestoreMode.DEFAULT); } /** * Creates {@link StandaloneCompletedCheckpointStore}. * + * @param restoreMode * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at least * 1). Adding more checkpoints than this results in older checkpoints being discarded. */ public StandaloneCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) { + Executor ioExecutor, + RestoreMode restoreMode) { this( maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1), - ioExecutor); + ioExecutor, + restoreMode); } private StandaloneCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, ArrayDeque<CompletedCheckpoint> checkpoints, - Executor ioExecutor) { - super(sharedStateRegistryFactory.create(ioExecutor, checkpoints)); + Executor ioExecutor, + RestoreMode restoreMode) { + super(sharedStateRegistryFactory.create(ioExecutor, checkpoints, restoreMode)); checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; this.checkpoints = checkpoints; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 052b3405235..c522296cf89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -51,7 +52,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception { return ZooKeeperUtils.createCompletedCheckpoints( @@ -61,7 +63,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, ioExecutor, - executor); + executor, + restoreMode); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java index c97f8644a1f..e657c8d7400 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.dispatcher.JobCancellationFailedException; import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -145,7 +146,11 @@ public class CheckpointResourcesCleanupRunner implements JobManagerRunner { DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints( jobManagerConfiguration, LOG), sharedStateRegistryFactory, - cleanupExecutor); + cleanupExecutor, + // Using RestoreMode.CLAIM to be able to discard shared state, if any. + // Note that it also means that the original shared state might be discarded as well + // because the initial checkpoint might be subsumed. + RestoreMode.CLAIM); } private CheckpointIDCounter createCheckpointIDCounter() throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java index ce59d0d99e9..b9859ba0f27 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java @@ -40,13 +40,18 @@ public class EmbeddedHaServicesWithLeadershipControl extends EmbeddedHaServices this( executor, new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>( - (maxCheckpoints, previous, stateRegistryFactory, ioExecutor) -> { + (maxCheckpoints, + previous, + stateRegistryFactory, + ioExecutor, + restoreMode) -> { List<CompletedCheckpoint> checkpoints = previous != null ? previous.getAllCheckpoints() : Collections.emptyList(); SharedStateRegistry stateRegistry = - stateRegistryFactory.create(ioExecutor, checkpoints); + stateRegistryFactory.create( + ioExecutor, checkpoints, restoreMode); if (previous != null) { if (!previous.getShutdownStatus().isPresent()) { throw new IllegalStateException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java index 10a4f2ac60d..da6c325265a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java @@ -53,4 +53,6 @@ public enum RestoreMode implements DescribedEnum { public InlineElement getDescription() { return text(description); } + + public static final RestoreMode DEFAULT = NO_CLAIM; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java index e8a9dd86d1c..a38e05c5f3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java @@ -52,7 +52,7 @@ public class SavepointConfigOptions { public static final ConfigOption<RestoreMode> RESTORE_MODE = key("execution.savepoint-restore-mode") .enumType(RestoreMode.class) - .defaultValue(RestoreMode.NO_CLAIM) + .defaultValue(RestoreMode.DEFAULT) .withDescription( "Describes the mode how Flink should restore from the given" + " savepoint or retained checkpoint."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java index 6b14801d8eb..87f2e56c6ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.slf4j.Logger; @@ -55,7 +56,12 @@ public final class SchedulerUtils { if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { try { return createCompletedCheckpointStore( - configuration, checkpointRecoveryFactory, ioExecutor, log, jobId); + configuration, + checkpointRecoveryFactory, + ioExecutor, + log, + jobId, + jobGraph.getSavepointRestoreSettings().getRestoreMode()); } catch (Exception e) { throw new JobExecutionException( jobId, @@ -73,14 +79,16 @@ public final class SchedulerUtils { CheckpointRecoveryFactory recoveryFactory, Executor ioExecutor, Logger log, - JobID jobId) + JobID jobId, + RestoreMode restoreMode) throws Exception { return recoveryFactory.createRecoveredCompletedCheckpointStore( jobId, DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints( jobManagerConfig, log), SharedStateRegistry.DEFAULT_FACTORY, - ioExecutor); + ioExecutor, + restoreMode); } public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnabled( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 7172bec4c24..b816f09e767 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.jobgraph.RestoreMode; /** * This registry manages state that is shared across (incremental) checkpoints, and is responsible @@ -32,11 +33,11 @@ public interface SharedStateRegistry extends AutoCloseable { /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */ SharedStateRegistryFactory DEFAULT_FACTORY = - (deleteExecutor, checkpoints) -> { + (deleteExecutor, checkpoints, restoreMode) -> { SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(deleteExecutor); for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode); } return sharedStateRegistry; }; @@ -66,10 +67,25 @@ public interface SharedStateRegistry extends AutoCloseable { /** * Register given shared states in the registry. * + * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after recovery), please use + * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)} + * * @param stateHandles The shared states to register. * @param checkpointID which uses the states. */ void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID); + /** + * Set the lowest checkpoint ID below which no state is discarded, inclusive. + * + * <p>After recovery from an incremental checkpoint, its state should NOT be discarded, even if + * {@link #unregisterUnusedState(long) not used} anymore (unless recovering in {@link + * RestoreMode#CLAIM CLAIM} mode). + * + * <p>This should hold for both cases: when recovering from that initial checkpoint; and from + * any subsequent checkpoint derived from it. + */ + void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode); + void checkpointCompleted(long checkpointId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java index bbdd2fd0959..bc8118cce42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.jobgraph.RestoreMode; import java.util.Collection; import java.util.concurrent.Executor; @@ -29,10 +30,13 @@ public interface SharedStateRegistryFactory { /** * Factory method for {@link SharedStateRegistry}. * - * @param checkpoints whose shared state will be registered. * @param deleteExecutor executor used to run (async) deletes. + * @param checkpoints whose shared state will be registered. + * @param restoreMode the mode in which the given checkpoints were restored * @return a SharedStateRegistry object */ SharedStateRegistry create( - Executor deleteExecutor, Collection<CompletedCheckpoint> checkpoints); + Executor deleteExecutor, + Collection<CompletedCheckpoint> checkpoints, + RestoreMode restoreMode); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java index b87d8646086..4dce6455277 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.util.concurrent.Executors; import org.slf4j.Logger; @@ -51,6 +53,9 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { /** Executor for async state deletion */ private final Executor asyncDisposalExecutor; + /** Checkpoint ID below which no state is discarded, inclusive. */ + private long highestNotClaimedCheckpointID = -1L; + /** Default uses direct executor to delete unreferenced state */ public SharedStateRegistryImpl() { this(Executors.directExecutor()); @@ -147,7 +152,9 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { while (it.hasNext()) { SharedStateEntry entry = it.next(); if (entry.lastUsedCheckpointID < lowestCheckpointID) { - subsumed.add(entry.stateHandle); + if (entry.createdByCheckpointID > highestNotClaimedCheckpointID) { + subsumed.add(entry.stateHandle); + } it.remove(); } } @@ -174,6 +181,20 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { } } + @Override + public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) { + registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID()); + // In NO_CLAIM and LEGACY restore modes, shared state of the initial checkpoints must be + // preserved. This is achieved by advancing highestRetainCheckpointID here, and then + // checking entry.createdByCheckpointID against it on checkpoint subsumption. + // In CLAIM restore mode, the shared state of the initial checkpoints must be + // discarded as soon as it becomes unused - so highestRetainCheckpointID is not updated. + if (mode != RestoreMode.CLAIM) { + highestNotClaimedCheckpointID = + Math.max(highestNotClaimedCheckpointID, checkpoint.getCheckpointID()); + } + } + @Override public void checkpointCompleted(long checkpointId) { for (SharedStateEntry entry : registeredStates.values()) { @@ -251,6 +272,8 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { /** The shared state handle */ StreamStateHandle stateHandle; + private final long createdByCheckpointID; + private long lastUsedCheckpointID; /** Whether this entry is included into a confirmed checkpoint. */ @@ -258,6 +281,7 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { SharedStateEntry(StreamStateHandle value, long checkpointID) { this.stateHandle = value; + this.createdByCheckpointID = checkpointID; this.lastUsedCheckpointID = checkpointID; } @@ -266,6 +290,8 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { return "SharedStateEntry{" + "stateHandle=" + stateHandle + + ", createdByCheckpointID=" + + createdByCheckpointID + ", lastUsedCheckpointID=" + lastUsedCheckpointID + '}'; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index c62b0c3bde0..2de67d006d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobGraphStore; @@ -569,6 +570,7 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain * @param executor to run ZooKeeper callbacks + * @param restoreMode the mode in which the job is being restored * @return {@link DefaultCompletedCheckpointStore} instance * @throws Exception if the completed checkpoint store cannot be created */ @@ -578,7 +580,8 @@ public class ZooKeeperUtils { int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor, - Executor executor) + Executor executor, + RestoreMode restoreMode) throws Exception { checkNotNull(configuration, "Configuration"); @@ -597,7 +600,8 @@ public class ZooKeeperUtils { completedCheckpointStateHandleStore, ZooKeeperCheckpointStoreUtil.INSTANCE, completedCheckpoints, - sharedStateRegistryFactory.create(ioExecutor, completedCheckpoints), + sharedStateRegistryFactory.create( + ioExecutor, completedCheckpoints, restoreMode), executor); LOG.info( "Initialized {} in '{}' with {}.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index d892319cbd0..edc0a6f7e34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.state.InputChannelStateHandle; @@ -286,7 +287,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { public FailingCompletedCheckpointStore(Exception addCheckpointFailure) { super( SharedStateRegistry.DEFAULT_FACTORY.create( - Executors.directExecutor(), emptyList())); + Executors.directExecutor(), emptyList(), RestoreMode.DEFAULT)); this.addCheckpointFailure = addCheckpointFailure; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index 2d80c065a94..a26e7ea5059 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; @@ -241,7 +242,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { final ExecutionGraph executionGraph = createExecutionGraph(vertices); final EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore( - completedCheckpoints.size(), completedCheckpoints); + completedCheckpoints.size(), completedCheckpoints, RestoreMode.DEFAULT); // set up the coordinator and validate the initial state final CheckpointCoordinator coordinator = @@ -780,7 +781,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { // set up the coordinator and validate the initial state SharedStateRegistry sharedStateRegistry = - SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), emptyList()); + SharedStateRegistry.DEFAULT_FACTORY.create( + Executors.directExecutor(), emptyList(), RestoreMode.DEFAULT); CheckpointCoordinator coord = new CheckpointCoordinatorBuilder() .setCompletedCheckpointStore( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index ccc0555ec44..da1d49bf00a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; @@ -2860,161 +2861,176 @@ public class CheckpointCoordinatorTest extends TestLogger { @Test public void testSharedStateRegistrationOnRestore() throws Exception { - JobVertexID jobVertexID1 = new JobVertexID(); + for (RestoreMode restoreMode : RestoreMode.values()) { + JobVertexID jobVertexID1 = new JobVertexID(); - int parallelism1 = 2; - int maxParallelism1 = 4; + int parallelism1 = 2; + int maxParallelism1 = 4; - ExecutionGraph graph = - new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() - .addJobVertex(jobVertexID1, parallelism1, maxParallelism1) - .build(EXECUTOR_RESOURCE.getExecutor()); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID1, parallelism1, maxParallelism1) + .build(EXECUTOR_RESOURCE.getExecutor()); - ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1); + ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1); - List<CompletedCheckpoint> checkpoints = Collections.emptyList(); - SharedStateRegistry firstInstance = - SharedStateRegistry.DEFAULT_FACTORY.create( - org.apache.flink.util.concurrent.Executors.directExecutor(), checkpoints); - final EmbeddedCompletedCheckpointStore store = - new EmbeddedCompletedCheckpointStore(10, checkpoints, firstInstance); + List<CompletedCheckpoint> checkpoints = Collections.emptyList(); + SharedStateRegistry firstInstance = + SharedStateRegistry.DEFAULT_FACTORY.create( + org.apache.flink.util.concurrent.Executors.directExecutor(), + checkpoints, + restoreMode); + final EmbeddedCompletedCheckpointStore store = + new EmbeddedCompletedCheckpointStore(10, checkpoints, firstInstance); - // set up the coordinator and validate the initial state - final CheckpointCoordinatorBuilder coordinatorBuilder = - new CheckpointCoordinatorBuilder().setTimer(manuallyTriggeredScheduledExecutor); - final CheckpointCoordinator coordinator = - coordinatorBuilder.setCompletedCheckpointStore(store).build(graph); + // set up the coordinator and validate the initial state + final CheckpointCoordinatorBuilder coordinatorBuilder = + new CheckpointCoordinatorBuilder().setTimer(manuallyTriggeredScheduledExecutor); + final CheckpointCoordinator coordinator = + coordinatorBuilder.setCompletedCheckpointStore(store).build(graph); - final int numCheckpoints = 3; + final int numCheckpoints = 3; - List<KeyGroupRange> keyGroupPartitions1 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); + List<KeyGroupRange> keyGroupPartitions1 = + StateAssignmentOperation.createKeyGroupPartitions( + maxParallelism1, parallelism1); - for (int i = 0; i < numCheckpoints; ++i) { - performIncrementalCheckpoint( - graph.getJobID(), coordinator, jobVertex1, keyGroupPartitions1, i); - } + for (int i = 0; i < numCheckpoints; ++i) { + performIncrementalCheckpoint( + graph.getJobID(), coordinator, jobVertex1, keyGroupPartitions1, i); + } - List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints(); - assertEquals(numCheckpoints, completedCheckpoints.size()); + List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints(); + assertEquals(numCheckpoints, completedCheckpoints.size()); - int sharedHandleCount = 0; + int sharedHandleCount = 0; - List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint = - new ArrayList<>(numCheckpoints); + List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint = + new ArrayList<>(numCheckpoints); - for (int i = 0; i < numCheckpoints; ++i) { - sharedHandlesByCheckpoint.add(new HashMap<>(2)); - } + for (int i = 0; i < numCheckpoints; ++i) { + sharedHandlesByCheckpoint.add(new HashMap<>(2)); + } - int cp = 0; - for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { - for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { - for (OperatorSubtaskState subtaskState : taskState.getStates()) { - for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) { - // test we are once registered with the current registry - verify(keyedStateHandle, times(1)) - .registerSharedStates( - firstInstance, completedCheckpoint.getCheckpointID()); - IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = - (IncrementalRemoteKeyedStateHandle) keyedStateHandle; - - sharedHandlesByCheckpoint - .get(cp) - .putAll(incrementalKeyedStateHandle.getSharedState()); - - for (StreamStateHandle streamStateHandle : - incrementalKeyedStateHandle.getSharedState().values()) { - assertTrue( - !(streamStateHandle instanceof PlaceholderStreamStateHandle)); - verify(streamStateHandle, never()).discardState(); - ++sharedHandleCount; - } + int cp = 0; + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { + for (OperatorSubtaskState subtaskState : taskState.getStates()) { + for (KeyedStateHandle keyedStateHandle : + subtaskState.getManagedKeyedState()) { + // test we are once registered with the current registry + verify(keyedStateHandle, times(1)) + .registerSharedStates( + firstInstance, completedCheckpoint.getCheckpointID()); + IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = + (IncrementalRemoteKeyedStateHandle) keyedStateHandle; + + sharedHandlesByCheckpoint + .get(cp) + .putAll(incrementalKeyedStateHandle.getSharedState()); + + for (StreamStateHandle streamStateHandle : + incrementalKeyedStateHandle.getSharedState().values()) { + assertTrue( + !(streamStateHandle + instanceof PlaceholderStreamStateHandle)); + verify(streamStateHandle, never()).discardState(); + ++sharedHandleCount; + } - for (StreamStateHandle streamStateHandle : - incrementalKeyedStateHandle.getPrivateState().values()) { - verify(streamStateHandle, never()).discardState(); + for (StreamStateHandle streamStateHandle : + incrementalKeyedStateHandle.getPrivateState().values()) { + verify(streamStateHandle, never()).discardState(); + } + + verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) + .discardState(); } - verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) - .discardState(); + verify(subtaskState, never()).discardState(); } - - verify(subtaskState, never()).discardState(); } + ++cp; } - ++cp; - } - // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10 - assertEquals(10, sharedHandleCount); + // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10 + assertEquals(10, sharedHandleCount); - // discard CP0 - store.removeOldestCheckpoint(); + // discard CP0 + store.removeOldestCheckpoint(); - // we expect no shared state was discarded because the state of CP0 is still referenced by - // CP1 - for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { - for (StreamStateHandle streamStateHandle : cpList.values()) { - verify(streamStateHandle, never()).discardState(); + // we expect no shared state was discarded because the state of CP0 is still referenced + // by + // CP1 + for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { + for (StreamStateHandle streamStateHandle : cpList.values()) { + verify(streamStateHandle, never()).discardState(); + } } - } - // shutdown the store - store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner()); - - // restore the store - Set<ExecutionJobVertex> tasks = new HashSet<>(); - tasks.add(jobVertex1); - - assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null)); - SharedStateRegistry secondInstance = - SharedStateRegistry.DEFAULT_FACTORY.create( - org.apache.flink.util.concurrent.Executors.directExecutor(), - store.getAllCheckpoints()); - final EmbeddedCompletedCheckpointStore secondStore = - new EmbeddedCompletedCheckpointStore(10, store.getAllCheckpoints(), secondInstance); - final CheckpointCoordinator secondCoordinator = - coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph); - assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false)); - - // validate that all shared states are registered again after the recovery. - cp = 0; - for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { - for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { - for (OperatorSubtaskState subtaskState : taskState.getStates()) { - for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) { - VerificationMode verificationMode; - // test we are once registered with the new registry - if (cp > 0) { - verificationMode = times(1); - } else { - verificationMode = never(); - } + // shutdown the store + store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner()); + + // restore the store + Set<ExecutionJobVertex> tasks = new HashSet<>(); + tasks.add(jobVertex1); + + assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null)); + SharedStateRegistry secondInstance = + SharedStateRegistry.DEFAULT_FACTORY.create( + org.apache.flink.util.concurrent.Executors.directExecutor(), + store.getAllCheckpoints(), + restoreMode); + final EmbeddedCompletedCheckpointStore secondStore = + new EmbeddedCompletedCheckpointStore( + 10, store.getAllCheckpoints(), secondInstance); + final CheckpointCoordinator secondCoordinator = + coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph); + assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false)); + + // validate that all shared states are registered again after the recovery. + cp = 0; + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { + for (OperatorSubtaskState subtaskState : taskState.getStates()) { + for (KeyedStateHandle keyedStateHandle : + subtaskState.getManagedKeyedState()) { + VerificationMode verificationMode; + // test we are once registered with the new registry + if (cp > 0) { + verificationMode = times(1); + } else { + verificationMode = never(); + } - // check that all are registered with the new registry - verify(keyedStateHandle, verificationMode) - .registerSharedStates( - secondInstance, completedCheckpoint.getCheckpointID()); + // check that all are registered with the new registry + verify(keyedStateHandle, verificationMode) + .registerSharedStates( + secondInstance, completedCheckpoint.getCheckpointID()); + } } } + ++cp; } - ++cp; - } - // discard CP1 - secondStore.removeOldestCheckpoint(); + // discard CP1 + secondStore.removeOldestCheckpoint(); - // we expect that all shared state from CP0 is no longer referenced and discarded. CP2 is - // still live and also - // references the state from CP1, so we expect they are not discarded. - verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 0 ? times(1) : never()); + // we expect that all shared state from CP0 is no longer referenced and discarded. CP2 + // is + // still live and also + // references the state from CP1, so we expect they are not discarded. + verifyDiscard( + sharedHandlesByCheckpoint, + cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? times(1) : never()); - // discard CP2 - secondStore.removeOldestCheckpoint(); + // discard CP2 + secondStore.removeOldestCheckpoint(); - // still expect shared state not to be discarded because it may be used in later checkpoints - verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? never() : atLeast(0)); + // still expect shared state not to be discarded because it may be used in later + // checkpoints + verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? never() : atLeast(0)); + } } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index b6461a9c984..a0b67cd0dfa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; @@ -235,7 +236,7 @@ public class CompletedCheckpointTest { null); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); - checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, RestoreMode.DEFAULT); verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L); } @@ -267,7 +268,7 @@ public class CompletedCheckpointTest { null); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); - checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, RestoreMode.DEFAULT); verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L); // Subsume diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java index 064e26a9b9f..22cd5dd021c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.TestingStateHandleStore; @@ -397,7 +398,9 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( stateHandleStore, checkpointStoreUtil), SharedStateRegistry.DEFAULT_FACTORY.create( - org.apache.flink.util.concurrent.Executors.directExecutor(), emptyList()), + org.apache.flink.util.concurrent.Executors.directExecutor(), + emptyList(), + RestoreMode.DEFAULT), executorService); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java index 58a0d954318..7203edfc709 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; @@ -49,7 +50,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger { firstJobId, 1, SharedStateRegistry.DEFAULT_FACTORY, - Executors.directExecutor())); + Executors.directExecutor(), + RestoreMode.DEFAULT)); assertThrows( UnsupportedOperationException.class, () -> @@ -57,7 +59,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger { firstJobId, 1, SharedStateRegistry.DEFAULT_FACTORY, - Executors.directExecutor())); + Executors.directExecutor(), + RestoreMode.DEFAULT)); final JobID secondJobId = new JobID(); assertSame( @@ -66,7 +69,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger { secondJobId, 1, SharedStateRegistry.DEFAULT_FACTORY, - Executors.directExecutor())); + Executors.directExecutor(), + RestoreMode.DEFAULT)); assertThrows( UnsupportedOperationException.class, () -> @@ -74,6 +78,7 @@ public class PerJobCheckpointRecoveryTest extends TestLogger { secondJobId, 1, SharedStateRegistry.DEFAULT_FACTORY, - Executors.directExecutor())); + Executors.directExecutor(), + RestoreMode.DEFAULT)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java index e164543d0ab..687196dff82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import java.util.concurrent.Executor; @@ -39,7 +40,8 @@ public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFacto JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) { + Executor ioExecutor, + RestoreMode restoreMode) { return store; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 5cc3db53e79..d3d705d51f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; @@ -90,7 +91,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( checkpointsInZooKeeper, checkpointStoreUtil), - SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), emptyList()), + SharedStateRegistry.DEFAULT_FACTORY.create( + Executors.directExecutor(), emptyList(), RestoreMode.DEFAULT), executor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 804d285ae28..cd8561157cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -196,7 +197,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { checkpointsInZooKeeper, zooKeeperCheckpointStoreUtil, Collections.emptyList(), - SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), emptyList()), + SharedStateRegistry.DEFAULT_FACTORY.create( + Executors.directExecutor(), emptyList(), RestoreMode.DEFAULT), Executors.directExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java index 7baad041258..d305011a7e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResul import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.JobGraphStore; @@ -84,7 +85,11 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest { super.setUp(); haServices.setCheckpointRecoveryFactory( new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>( - (maxCheckpoints, previous, sharedStateRegistryFactory, ioExecutor) -> { + (maxCheckpoints, + previous, + sharedStateRegistryFactory, + ioExecutor, + restoreMode) -> { if (previous != null) { // First job cleanup still succeeded for the // CompletedCheckpointStore because the JobGraph cleanup happens @@ -95,13 +100,17 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest { maxCheckpoints, previous.getAllCheckpoints(), sharedStateRegistryFactory.create( - ioExecutor, previous.getAllCheckpoints())); + ioExecutor, + previous.getAllCheckpoints(), + restoreMode)); } return new EmbeddedCompletedCheckpointStore( maxCheckpoints, Collections.emptyList(), sharedStateRegistryFactory.create( - ioExecutor, Collections.emptyList())); + ioExecutor, + Collections.emptyList(), + RestoreMode.DEFAULT)); })); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index 8a226fed8aa..cedcfc3d435 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; @@ -622,7 +623,8 @@ public class CheckpointResourcesCleanupRunnerTest { JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) + Executor ioExecutor, + RestoreMode restoreMode) throws Exception { creationLatch.await(); return completedCheckpointStore; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java index dfc3b40da07..9e21dd42e08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -78,7 +79,8 @@ public class SchedulerUtilsTest extends TestLogger { new StandaloneCheckpointRecoveryFactory(), Executors.directExecutor(), log, - new JobID()); + new JobID(), + RestoreMode.CLAIM); assertEquals( maxNumberOfCheckpointsToRetain, @@ -104,7 +106,8 @@ public class SchedulerUtilsTest extends TestLogger { recoveryFactory, Executors.directExecutor(), log, - new JobID()); + new JobID(), + RestoreMode.CLAIM); SharedStateRegistry sharedStateRegistry = checkpointStore.getSharedStateRegistry(); @@ -122,12 +125,14 @@ public class SchedulerUtilsTest extends TestLogger { JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, - Executor ioExecutor) { + Executor ioExecutor, + RestoreMode restoreMode) { List<CompletedCheckpoint> checkpoints = singletonList(checkpoint); return new EmbeddedCompletedCheckpointStore( maxNumberOfCheckpointsToRetain, checkpoints, - sharedStateRegistryFactory.create(ioExecutor, checkpoints)); + sharedStateRegistryFactory.create( + ioExecutor, checkpoints, RestoreMode.DEFAULT)); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index 7292b54b929..d36100be22e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -54,6 +54,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkState; import static org.junit.jupiter.api.Assertions.fail; /** This class contains auxiliary methods for unit tests. */ @@ -309,18 +310,30 @@ public class CommonTestUtils { }); } - /** Wait for at least one successful checkpoint. */ - public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster) + /** Wait for (at least) the given number of successful checkpoints. */ + public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints) throws Exception, FlinkJobNotFoundException { waitUntilCondition( - () -> - Optional.ofNullable( - miniCluster - .getExecutionGraph(jobID) - .get() - .getCheckpointStatsSnapshot()) - .filter(st -> st.getCounts().getNumberOfCompletedCheckpoints() > 0) - .isPresent()); + () -> { + AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get(); + if (Optional.ofNullable(graph.getCheckpointStatsSnapshot()) + .filter( + st -> + st.getCounts().getNumberOfCompletedCheckpoints() + >= numCheckpoints) + .isPresent()) { + return true; + } else if (graph.getState().isGloballyTerminalState()) { + checkState( + graph.getFailureInfo() != null, + "Job terminated before taking required %s checkpoints: %s", + numCheckpoints, + graph.getState()); + throw graph.getFailureInfo().getException(); + } else { + return false; + } + }); } /** diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index ab5d7f9b111..884c65c731d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -24,14 +24,15 @@ import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; @@ -43,13 +44,14 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.state.ManualWindowSpeedITCase; import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; import org.apache.curator.test.TestingServer; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import javax.annotation.Nullable; @@ -57,6 +59,9 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint; +import static org.apache.flink.test.util.TestUtils.waitUntilJobCanceled; import static org.junit.Assert.assertNotNull; /** @@ -68,26 +73,42 @@ import static org.junit.Assert.assertNotNull; * <p>This tests considers full and incremental checkpoints and was introduced to guard against * problems like FLINK-6964. */ +@RunWith(Parameterized.class) public class ResumeCheckpointManuallyITCase extends TestLogger { private static final int PARALLELISM = 2; private static final int NUM_TASK_MANAGERS = 2; private static final int SLOTS_PER_TASK_MANAGER = 2; + @Parameterized.Parameter public RestoreMode restoreMode; + + @Parameterized.Parameters(name = "RestoreMode = {0}") + public static Object[] parameters() { + return RestoreMode.values(); + } + @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createRocksDBStateBackend(checkpointDir, true), false); + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, true), + false, + restoreMode); } @Test public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createRocksDBStateBackend(checkpointDir, false), false); + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, false), + false, + restoreMode); } @Test @@ -95,7 +116,11 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createRocksDBStateBackend(checkpointDir, true), true); + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, true), + true, + restoreMode); } @Test @@ -103,20 +128,25 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createRocksDBStateBackend(checkpointDir, false), true); + checkpointDir, + null, + createRocksDBStateBackend(checkpointDir, false), + true, + restoreMode); } @Test public void testExternalizedFSCheckpointsStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createFsStateBackend(checkpointDir), false); + checkpointDir, null, createFsStateBackend(checkpointDir), false, restoreMode); } @Test public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); - testExternalizedCheckpoints(checkpointDir, null, createFsStateBackend(checkpointDir), true); + testExternalizedCheckpoints( + checkpointDir, null, createFsStateBackend(checkpointDir), true, restoreMode); } @Test @@ -129,7 +159,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, true), - false); + false, + restoreMode); } finally { zkServer.stop(); } @@ -145,7 +176,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, false), - false); + false, + restoreMode); } finally { zkServer.stop(); } @@ -162,7 +194,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, true), - true); + true, + restoreMode); } finally { zkServer.stop(); } @@ -179,7 +212,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, false), - true); + true, + restoreMode); } finally { zkServer.stop(); } @@ -195,7 +229,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createFsStateBackend(checkpointDir), - false); + false, + restoreMode); } finally { zkServer.stop(); } @@ -211,7 +246,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { checkpointDir, zkServer.getConnectString(), createFsStateBackend(checkpointDir), - true); + true, + restoreMode); } finally { zkServer.stop(); } @@ -227,8 +263,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { return new RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing); } - private void testExternalizedCheckpoints( - File checkpointDir, String zooKeeperQuorum, StateBackend backend, boolean localRecovery) + private static void testExternalizedCheckpoints( + File checkpointDir, + String zooKeeperQuorum, + StateBackend backend, + boolean localRecovery, + RestoreMode restoreMode) throws Exception { final Configuration config = new Configuration(); @@ -264,22 +304,28 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { cluster.before(); - ClusterClient<?> client = cluster.getClusterClient(); - try { // main test sequence: start job -> eCP -> restore job -> eCP -> restore job String firstExternalCheckpoint = - runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client); + runJobAndGetExternalizedCheckpoint(backend, null, cluster, restoreMode); assertNotNull(firstExternalCheckpoint); String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint( - backend, checkpointDir, firstExternalCheckpoint, client); + backend, firstExternalCheckpoint, cluster, restoreMode); assertNotNull(secondExternalCheckpoint); String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint( - backend, checkpointDir, secondExternalCheckpoint, client); + backend, + // in CLAIM mode, the previous run is only guaranteed to preserve the + // latest checkpoint; in NO_CLAIM/LEGACY, even the initial checkpoints + // must remain valid + restoreMode == RestoreMode.CLAIM + ? secondExternalCheckpoint + : firstExternalCheckpoint, + cluster, + restoreMode); assertNotNull(thirdExternalCheckpoint); } finally { cluster.after(); @@ -288,26 +334,32 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { private static String runJobAndGetExternalizedCheckpoint( StateBackend backend, - File checkpointDir, @Nullable String externalCheckpoint, - ClusterClient<?> client) + MiniClusterWithClientResource cluster, + RestoreMode restoreMode) throws Exception { - JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint); + JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, restoreMode); NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); - - client.submitJob(initialJobGraph).get(); + cluster.getClusterClient().submitJob(initialJobGraph).get(); // wait until all sources have been started NotifyingInfiniteTupleSource.countDownLatch.await(); - TestUtils.waitUntilExternalizedCheckpointCreated(checkpointDir); - client.cancel(initialJobGraph.getJobID()).get(); - TestUtils.waitUntilJobCanceled(initialJobGraph.getJobID(), client); - - return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath(); + // complete at least two checkpoints so that the initial checkpoint can be subsumed + waitForCheckpoint(initialJobGraph.getJobID(), cluster.getMiniCluster(), 2); + cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get(); + waitUntilJobCanceled(initialJobGraph.getJobID(), cluster.getClusterClient()); + + return getLatestCompletedCheckpointPath( + initialJobGraph.getJobID(), cluster.getMiniCluster()) + .<IllegalStateException>orElseThrow( + () -> { + throw new IllegalStateException("Checkpoint not generated"); + }); } - private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) { + private static JobGraph getJobGraph( + StateBackend backend, @Nullable String externalCheckpoint, RestoreMode restoreMode) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); @@ -316,6 +368,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { env.getCheckpointConfig() .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.noRestart()); env.addSource(new NotifyingInfiniteTupleSource(10_000)) .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) @@ -331,7 +384,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { // recover from previous iteration? if (externalCheckpoint != null) { jobGraph.setSavepointRestoreSettings( - SavepointRestoreSettings.forPath(externalCheckpoint)); + SavepointRestoreSettings.forPath(externalCheckpoint, false, restoreMode)); } return jobGraph; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java index 6dc4606158f..aafa0966878 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java @@ -143,7 +143,7 @@ public class ChangelogCompatibilityITCase { ClusterClient<?> client = miniClusterResource.getClusterClient(); submit(jobGraph, client); if (testCase.restoreSource == RestoreSource.CHECKPOINT) { - waitForCheckpoint(jobGraph.getJobID(), miniClusterResource.getMiniCluster()); + waitForCheckpoint(jobGraph.getJobID(), miniClusterResource.getMiniCluster(), 1); client.cancel(jobGraph.getJobID()).get(); // obtain the latest checkpoint *after* cancellation - that one won't be subsumed return CommonTestUtils.getLatestCompletedCheckpointPath( diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index 0ad238b5b27..ed7b3d9e289 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -326,7 +326,7 @@ public class ChangelogRescalingITCase extends TestLogger { } private String checkpointAndCancel(JobID jobID) throws Exception { - waitForCheckpoint(jobID, cluster.getMiniCluster()); + waitForCheckpoint(jobID, cluster.getMiniCluster(), 1); cluster.getClusterClient().cancel(jobID).get(); checkStatus(jobID); return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, cluster.getMiniCluster())