[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91a4b276 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91a4b276 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91a4b276 Branch: refs/heads/master Commit: 91a4b276171afb760bfff9ccf30593e648e91dfb Parents: b71154a Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue Jul 25 12:04:16 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 15 14:56:54 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 13 +- .../checkpoint/CheckpointCoordinator.java | 32 +- .../runtime/checkpoint/CompletedCheckpoint.java | 2 + .../checkpoint/CompletedCheckpointStore.java | 5 +- .../StandaloneCompletedCheckpointStore.java | 4 +- .../ZooKeeperCompletedCheckpointStore.java | 12 +- .../runtime/executiongraph/ExecutionGraph.java | 6 +- .../executiongraph/ExecutionJobVertex.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 2 +- .../state/IncrementalKeyedStateHandle.java | 68 ++-- .../runtime/state/KeyGroupsStateHandle.java | 2 +- .../runtime/state/MultiStreamStateHandle.java | 10 +- .../runtime/state/SharedStateRegistry.java | 52 ++- .../state/SharedStateRegistryFactory.java | 35 ++ .../state/memory/ByteStreamStateHandle.java | 1 + ...tCoordinatorExternalizedCheckpointsTest.java | 22 +- .../CheckpointCoordinatorFailureTest.java | 7 +- .../CheckpointCoordinatorMasterHooksTest.java | 7 +- .../checkpoint/CheckpointCoordinatorTest.java | 341 +++++++++++++++++-- .../checkpoint/CheckpointStateRestoreTest.java | 10 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 25 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 7 +- .../state/IncrementalKeyedStateHandleTest.java | 75 +++- .../RecoverableCompletedCheckpointStore.java | 33 +- .../streaming/runtime/tasks/StreamTask.java | 21 +- ...tractEventTimeWindowCheckpointingITCase.java | 76 +++-- ...ckendEventTimeWindowCheckpointingITCase.java | 49 +++ 27 files changed, 743 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index bba5b55..756cfdd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -253,7 +253,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); - LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); + + LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.", + this.operatorIdentifier, + this.backendUID); } /** @@ -883,11 +886,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { void takeSnapshot() throws Exception { assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); + final long lastCompletedCheckpoint; + // use the last completed checkpoint as the comparison base. synchronized (stateBackend.materializedSstFiles) { - baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; + baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); } + LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + + "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0b64a73..c98d3aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -174,8 +175,11 @@ public class CheckpointCoordinator { @Nullable private CheckpointStatsTracker statsTracker; + /** A factory for SharedStateRegistry objects */ + private final SharedStateRegistryFactory sharedStateRegistryFactory; + /** Registry that tracks state which is shared across (incremental) checkpoints */ - private final SharedStateRegistry sharedStateRegistry; + private SharedStateRegistry sharedStateRegistry; // -------------------------------------------------------------------------------------------- @@ -192,7 +196,8 @@ public class CheckpointCoordinator { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, @Nullable String checkpointDirectory, - Executor executor) { + Executor executor, + SharedStateRegistryFactory sharedStateRegistryFactory) { // sanity checks checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); @@ -230,7 +235,8 @@ public class CheckpointCoordinator { this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.checkpointDirectory = checkpointDirectory; this.executor = checkNotNull(executor); - this.sharedStateRegistry = new SharedStateRegistry(executor); + this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); + this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -1043,10 +1049,23 @@ public class CheckpointCoordinator { throw new IllegalStateException("CheckpointCoordinator is shut down"); } - // Recover the checkpoints - completedCheckpointStore.recover(sharedStateRegistry); + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery + completedCheckpointStore.recover(); + + // Now, we re-register all (shared) states from the checkpoint store with the new registry + for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) { + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + } + + LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry); - // restore from the latest checkpoint + // Restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); if (latest == null) { @@ -1120,7 +1139,6 @@ public class CheckpointCoordinator { CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( job, tasks, savepointPath, userClassLoader, allowNonRestored); - savepoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpointStore.addCheckpoint(savepoint); // Reset the checkpoint ID counter http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 7c3edee..d3f61e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable { private void doDiscard() throws Exception { + LOG.trace("Executing discard procedure for {}.", this); + try { // collect exceptions and continue cleanup Exception exception = null; http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 45d407e..82193b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.state.SharedStateRegistry; import java.util.List; @@ -33,10 +32,8 @@ public interface CompletedCheckpointStore { * * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * available checkpoint. - * - * @param sharedStateRegistry the shared state registry to register recovered states. */ - void recover(SharedStateRegistry sharedStateRegistry) throws Exception; + void recover() throws Exception; /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index fbb0198..63e7468 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.state.SharedStateRegistry; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { // Nothing to do } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index c4cb6bc..88dd0d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -18,20 +18,21 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * that the history of checkpoints is consistent. */ @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); // Clear local handles in order to prevent duplicates on // recovery. The local handles should reflect the state // of ZooKeeper. completedCheckpoints.clear(); - sharedStateRegistry.clear(); // Get all there is first List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; @@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto try { completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); if (completedCheckpoint != null) { - // Re-register all shared states in the checkpoint. - completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpoints.add(completedCheckpoint); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 139f484..2e5f3d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.SerializedThrowable; @@ -69,8 +70,8 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -456,7 +457,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive checkpointIDCounter, checkpointStore, checkpointDir, - ioExecutor); + ioExecutor, + SharedStateRegistry.DEFAULT_FACTORY); // register the master hooks on the checkpoint coordinator for (MasterTriggerRestoreHook<?> hook : masterHooks) { http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 5ee7a9f..e6d49d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 25df19b..d6019db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -89,9 +89,9 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedThrowable; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 0085890..0268b10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private final UUID backendIdentifier; /** - * The key-group range covered by this state handle + * The key-group range covered by this state handle. */ private final KeyGroupRange keyGroupRange; /** - * The checkpoint Id + * The checkpoint Id. */ private final long checkpointId; /** - * Shared state in the incremental checkpoint. This i + * Shared state in the incremental checkpoint. */ private final Map<StateHandleID, StreamStateHandle> sharedState; /** - * Private state in the incremental checkpoint + * Private state in the incremental checkpoint. */ private final Map<StateHandleID, StreamStateHandle> privateState; /** - * Primary meta data state of the incremental checkpoint + * Primary meta data state of the incremental checkpoint. */ private final StreamStateHandle metaStateHandle; @@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { - return this; - } else { - return null; - } + return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ? + null : this; } @Override public void discardState() throws Exception { + SharedStateRegistry registry = this.sharedStateRegistry; + final boolean isRegistered = (registry != null); + + LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.", + isRegistered, + checkpointId, + backendIdentifier); + try { metaStateHandle.discardState(); } catch (Exception e) { @@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { // If this was not registered, we can delete the shared state. We can simply apply this // to all handles, because all handles that have not been created for the first time for this // are only placeholders at this point (disposing them is a NOP). - if (sharedStateRegistry == null) { - try { - StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); - } - } else { + if (isRegistered) { // If this was registered, we only unregister all our referenced shared states // from the registry. for (StateHandleID stateHandleID : sharedState.keySet()) { - sharedStateRegistry.unregisterReference( + registry.unregisterReference( createSharedStateRegistryKeyFromFileName(stateHandleID)); } + } else { + // Otherwise, we assume to own those handles and dispose them directly. + try { + StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } } } @@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { - Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states."); + // This is a quick check to avoid that we register twice with the same registry. However, the code allows to + // register again with a different registry. The implication is that ownership is transferred to this new + // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new + // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that + // an old registry object from a previous run is due to be GCed and will never be used for registration again. + Preconditions.checkState( + sharedStateRegistry != stateRegistry, + "The state handle has already registered its shared states to the given registry."); sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); + LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.", + checkpointId, + backendIdentifier); + for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); @@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { result = 31 * result + getMetaStateHandle().hashCode(); return result; } + + @Override + public String toString() { + return "IncrementalKeyedStateHandle{" + + "backendIdentifier=" + backendIdentifier + + ", keyGroupRange=" + keyGroupRange + + ", checkpointId=" + checkpointId + + ", sharedState=" + sharedState + + ", privateState=" + privateState + + ", metaStateHandle=" + metaStateHandle + + ", registered=" + (sharedStateRegistry != null) + + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 8e38ad4..8092f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle public String toString() { return "KeyGroupsStateHandle{" + "groupRangeOffsets=" + groupRangeOffsets + - ", data=" + stateHandle + + ", stateHandle=" + stateHandle + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java index b95dace..1960c1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java @@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle { private final List<StreamStateHandle> stateHandles; private final long stateSize; - public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException { + public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) { this.stateHandles = Preconditions.checkNotNull(stateHandles); long calculateSize = 0L; for(StreamStateHandle stateHandle : stateHandles) { @@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle { return stateSize; } + @Override + public String toString() { + return "MultiStreamStateHandle{" + + "stateHandles=" + stateHandles + + ", stateSize=" + stateSize + + '}'; + } + static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream { private final TreeMap<Long, StreamStateHandle> stateHandleMap; http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index e0ca873..347f30c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -38,13 +38,24 @@ import java.util.concurrent.Executor; * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies * them. */ -public class SharedStateRegistry { +public class SharedStateRegistry implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); + /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */ + public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() { + @Override + public SharedStateRegistry create(Executor deleteExecutor) { + return new SharedStateRegistry(deleteExecutor); + } + }; + /** All registered state objects by an artificial key */ private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + /** This flag indicates whether or not the registry is open or if close() was called */ + private boolean open; + /** Executor for async state deletion */ private final Executor asyncDisposalExecutor; @@ -56,6 +67,7 @@ public class SharedStateRegistry { public SharedStateRegistry(Executor asyncDisposalExecutor) { this.registeredStates = new HashMap<>(); this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor); + this.open = true; } /** @@ -82,6 +94,9 @@ public class SharedStateRegistry { SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { + + Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry."); + entry = registeredStates.get(registrationKey); if (entry == null) { @@ -96,6 +111,11 @@ public class SharedStateRegistry { // delete if this is a real duplicate if (!Objects.equals(state, entry.stateHandle)) { scheduledStateDeletion = state; + LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " + + "be an unnecessary copy of existing state {} and will be dropped.", + registrationKey, + state, + entry.stateHandle); } entry.increaseReferenceCount(); } @@ -112,7 +132,8 @@ public class SharedStateRegistry { * * @param registrationKey the shared state for which we release a reference. * @return the result of the request, consisting of the reference count after this operation - * and the state handle, or null if the state handle was deleted through this request. + * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry + * was previously closed. */ public Result unregisterReference(SharedStateRegistryKey registrationKey) { @@ -123,6 +144,7 @@ public class SharedStateRegistry { SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { + entry = registeredStates.get(registrationKey); Preconditions.checkState(entry != null, @@ -164,10 +186,18 @@ public class SharedStateRegistry { } } + @Override + public String toString() { + synchronized (registeredStates) { + return "SharedStateRegistry{" + + "registeredStates=" + registeredStates + + '}'; + } + } + private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { - LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); asyncDisposalExecutor.execute( new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); @@ -178,6 +208,13 @@ public class SharedStateRegistry { return stateHandle instanceof PlaceholderStreamStateHandle; } + @Override + public void close() { + synchronized (registeredStates) { + open = false; + } + } + /** * An entry in the registry, tracking the handle and the corresponding reference count. */ @@ -279,13 +316,4 @@ public class SharedStateRegistry { } } } - - /** - * Clears the registry. - */ - public void clear() { - synchronized (registeredStates) { - registeredStates.clear(); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java new file mode 100644 index 0000000..05c9825 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.util.concurrent.Executor; + +/** + * Simple factory to produce {@link SharedStateRegistry} objects. + */ +public interface SharedStateRegistryFactory { + + /** + * Factory method for {@link SharedStateRegistry}. + * + * @param deleteExecutor executor used to run (async) deletes. + * @return a SharedStateRegistry object + */ + SharedStateRegistry create(Executor deleteExecutor); +} http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 9ba9d35..3a43d4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle { public String toString() { return "ByteStreamStateHandle{" + "handleName='" + handleName + '\'' + + ", dataBytes=" + data.length + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java index d293eea..edc29fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; import org.apache.flink.runtime.concurrent.Executors; @@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + /** * CheckpointCoordinator tests for externalized checkpoints. * @@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), checkpointDir.getAbsolutePath(), - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 88b95f5..26db772 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -79,7 +79,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { new StandaloneCheckpointIDCounter(), new FailingCompletedCheckpointStore(), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.triggerCheckpoint(triggerTimestamp, false); @@ -111,7 +112,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { when(subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(vertex.getJobvertexId()))).thenReturn(operatorSubtaskState); AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState); - + try { coord.receiveAcknowledgeMessage(acknowledgeMessage); fail("Expected a checkpoint exception because the completed checkpoint store could not " + @@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { throw new UnsupportedOperationException("Not implemented."); } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index e23f6a2..2f860e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -28,9 +28,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -46,14 +46,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.any; @@ -404,7 +402,8 @@ public class CheckpointCoordinatorMasterHooksTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); } private static <T> T mockGeneric(Class<?> clazz) { http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d9af879..45cbbc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -36,32 +36,36 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; import java.io.IOException; import java.io.Serializable; @@ -139,7 +143,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -199,7 +204,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -250,7 +256,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -302,7 +309,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -406,7 +414,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -525,7 +534,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -692,7 +702,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -822,7 +833,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -986,7 +998,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1063,7 +1076,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1126,7 +1140,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1258,7 +1273,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1350,7 +1366,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), "dummy-path", - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); try { coord.startCheckpointScheduler(); @@ -1423,7 +1440,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1574,7 +1592,8 @@ public class CheckpointCoordinatorTest extends TestLogger { counter, new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1680,7 +1699,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1753,7 +1773,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1835,7 +1856,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1887,7 +1909,8 @@ public class CheckpointCoordinatorTest extends TestLogger { checkpointIDCounter, new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); @@ -1940,7 +1963,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -2002,7 +2026,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), store, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2116,7 +2141,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2237,7 +2263,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2395,7 +2422,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2686,7 +2714,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), standaloneCompletedCheckpointStore, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.restoreLatestCheckpointedState(tasks, false, true); @@ -2847,7 +2876,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), "fake-directory", - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -3351,7 +3381,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // Periodic CheckpointTriggerResult triggerResult = coord.triggerCheckpoint( @@ -3529,7 +3560,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); coord.setCheckpointStatsTracker(tracker); @@ -3567,7 +3599,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), store, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); store.addCheckpoint(new CompletedCheckpoint( new JobID(), @@ -3623,7 +3656,8 @@ public class CheckpointCoordinatorTest extends TestLogger { checkpointIDCounter, completedCheckpointStore, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger a first checkpoint assertTrue( @@ -3673,4 +3707,245 @@ public class CheckpointCoordinatorTest extends TestLogger { "The latest completed (proper) checkpoint should have been added to the completed checkpoint store.", completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast()); } + + @Test + public void testSharedStateRegistrationOnRestore() throws Exception { + + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); + + final JobVertexID jobVertexID1 = new JobVertexID(); + + int parallelism1 = 2; + int maxParallelism1 = 4; + + final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex( + jobVertexID1, + parallelism1, + maxParallelism1); + + List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1); + + allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); + + ExecutionVertex[] arrayExecutionVertices = + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); + + RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(10); + + final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + store, + null, + Executors.directExecutor(), + new SharedStateRegistryFactory() { + @Override + public SharedStateRegistry create(Executor deleteExecutor) { + SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor); + createdSharedStateRegistries.add(instance); + return instance; + } + }); + + final int numCheckpoints = 3; + + List<KeyGroupRange> keyGroupPartitions1 = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); + + for (int i = 0; i < numCheckpoints; ++i) { + performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, timestamp + i, i); + } + + List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); + assertEquals(numCheckpoints, completedCheckpoints.size()); + + int sharedHandleCount = 0; + + List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint = new ArrayList<>(numCheckpoints); + + for (int i = 0; i < numCheckpoints; ++i) { + sharedHandlesByCheckpoint.add(new HashMap<StateHandleID, StreamStateHandle>(2)); + } + + int cp = 0; + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { + for (OperatorSubtaskState subtaskState : taskState.getStates()) { + for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) { + // test we are once registered with the current registry + verify(keyedStateHandle, times(1)).registerSharedStates(createdSharedStateRegistries.get(0)); + IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle; + + sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState()); + + for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getSharedState().values()) { + assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle)); + verify(streamStateHandle, never()).discardState(); + ++sharedHandleCount; + } + + for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getPrivateState().values()) { + verify(streamStateHandle, never()).discardState(); + } + + verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()).discardState(); + } + + verify(subtaskState, never()).discardState(); + } + } + ++cp; + } + + // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10 + assertEquals(10, sharedHandleCount); + + // discard CP0 + store.removeOldestCheckpoint(); + + // we expect no shared state was discarded because the state of CP0 is still referenced by CP1 + for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { + for (StreamStateHandle streamStateHandle : cpList.values()) { + verify(streamStateHandle, never()).discardState(); + } + } + + // shutdown the store + store.shutdown(JobStatus.SUSPENDED); + + // restore the store + Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); + tasks.put(jobVertexID1, jobVertex1); + coord.restoreLatestCheckpointedState(tasks, true, false); + + // validate that all shared states are registered again after the recovery. + cp = 0; + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { + for (OperatorSubtaskState subtaskState : taskState.getStates()) { + for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) { + VerificationMode verificationMode; + // test we are once registered with the new registry + if (cp > 0) { + verificationMode = times(1); + } else { + verificationMode = never(); + } + + //check that all are registered with the new registry + verify(keyedStateHandle, verificationMode).registerSharedStates(createdSharedStateRegistries.get(1)); + } + } + } + ++cp; + } + + // discard CP1 + store.removeOldestCheckpoint(); + + // we expect that all shared state from CP0 is no longer referenced and discarded. CP2 is still live and also + // references the state from CP1, so we expect they are not discarded. + for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { + for (Map.Entry<StateHandleID, StreamStateHandle> entry : cpList.entrySet()) { + String key = entry.getKey().getKeyString(); + int belongToCP = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1))); + if (belongToCP == 0) { + verify(entry.getValue(), times(1)).discardState(); + } else { + verify(entry.getValue(), never()).discardState(); + } + } + } + + // discard CP2 + store.removeOldestCheckpoint(); + + // we expect all shared state was discarded now, because all CPs are + for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { + for (StreamStateHandle streamStateHandle : cpList.values()) { + verify(streamStateHandle, times(1)).discardState(); + } + } + } + + private void performIncrementalCheckpoint( + JobID jid, + CheckpointCoordinator coord, + ExecutionJobVertex jobVertex1, + List<KeyGroupRange> keyGroupPartitions1, + long timestamp, + int cpSequenceNumber) throws Exception { + + // trigger the checkpoint + coord.triggerCheckpoint(timestamp, false); + + assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); + long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); + + for (int index = 0; index < jobVertex1.getParallelism(); index++) { + + KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index); + + Map<StateHandleID, StreamStateHandle> privateState = new HashMap<>(); + privateState.put( + new StateHandleID("private-1"), + spy(new ByteStreamStateHandle("private-1", new byte[]{'p'}))); + + Map<StateHandleID, StreamStateHandle> sharedState = new HashMap<>(); + + // let all but the first CP overlap by one shared state. + if (cpSequenceNumber > 0) { + sharedState.put( + new StateHandleID("shared-" + (cpSequenceNumber - 1)), + spy(new PlaceholderStreamStateHandle())); + } + + sharedState.put( + new StateHandleID("shared-" + cpSequenceNumber), + spy(new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'}))); + + IncrementalKeyedStateHandle managedState = + spy(new IncrementalKeyedStateHandle( + new UUID(42L, 42L), + keyGroupRange, + checkpointId, + sharedState, + privateState, + spy(new ByteStreamStateHandle("meta", new byte[]{'m'})))); + + OperatorSubtaskState operatorSubtaskState = + spy(new OperatorSubtaskState(null, + Collections.<OperatorStateHandle>emptyList(), + Collections.<OperatorStateHandle>emptyList(), + Collections.<KeyedStateHandle>singletonList(managedState), + Collections.<KeyedStateHandle>emptyList())); + + Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>(); + + opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState); + + TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates); + + AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( + jid, + jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + taskStateSnapshot); + + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 6ce071b..791bffa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; @@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -186,7 +188,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); try { coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false); @@ -243,7 +246,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); StreamStateHandle serializedState = CheckpointCoordinatorTest .generateChainedStateHandle(new SerializableObject()) http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 77423c2..dc2b11e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; + +import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.Before; @@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); // Recover - sharedStateRegistry.clear(); - checkpoints.recover(sharedStateRegistry); + sharedStateRegistry.close(); + sharedStateRegistry = new SharedStateRegistry(); + checkpoints.recover(); assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); @@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); - sharedStateRegistry.clear(); - store.recover(sharedStateRegistry); + sharedStateRegistry.close(); + store.recover(); assertEquals(0, store.getNumberOfRetainedCheckpoints()); } @@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren()); // Recover again - sharedStateRegistry.clear(); - store.recover(sharedStateRegistry); + sharedStateRegistry.close(); + store.recover(); CompletedCheckpoint recovered = store.getLatestCheckpoint(); assertEquals(checkpoint, recovered); @@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint checkpointStore.addCheckpoint(checkpoint); } - sharedStateRegistry.clear(); - checkpointStore.recover(sharedStateRegistry); + sharedStateRegistry.close(); + checkpointStore.recover(); CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(); @@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint zkCheckpointStore1.addCheckpoint(completedCheckpoint); // recover the checkpoint by a different checkpoint store - sharedStateRegistry.clear(); - zkCheckpointStore2.recover(sharedStateRegistry); + sharedStateRegistry.close(); + sharedStateRegistry = new SharedStateRegistry(); + zkCheckpointStore2.recover(); CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 91bab85..3171f1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { stateStorage, Executors.directExecutor()); - SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); - zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); - - verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); - verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); + zooKeeperCompletedCheckpointStore.recover(); CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java index c1b3ccd..9f6f88e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils; + import org.junit.Test; import java.util.Map; import java.util.Random; import java.util.UUID; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; @@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest { @Test public void testSharedStateDeRegistration() throws Exception { - Random rnd = new Random(42); - SharedStateRegistry registry = spy(new SharedStateRegistry()); // Create two state handles with overlapping shared state @@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest { verify(stateHandle2.getMetaStateHandle(), times(1)).discardState(); } + /** + * This tests that re-registration of shared state with another registry works as expected. This simulates a + * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers + * all live checkpoint states. + */ + @Test + public void testSharedStateReRegistration() throws Exception { + + SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry()); + + IncrementalKeyedStateHandle stateHandleX = create(new Random(1)); + IncrementalKeyedStateHandle stateHandleY = create(new Random(2)); + IncrementalKeyedStateHandle stateHandleZ = create(new Random(3)); + + // Now we register first time ... + stateHandleX.registerSharedStates(stateRegistryA); + stateHandleY.registerSharedStates(stateRegistryA); + stateHandleZ.registerSharedStates(stateRegistryA); + + try { + // Second attempt should fail + stateHandleX.registerSharedStates(stateRegistryA); + fail("Should not be able to register twice with the same registry."); + } catch (IllegalStateException ignore) { + } + + // Everything should be discarded for this handle + stateHandleZ.discardState(); + verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + // Close the first registry + stateRegistryA.close(); + + // Attempt to register to closed registry should trigger exception + try { + create(new Random(4)).registerSharedStates(stateRegistryA); + fail("Should not be able to register new state to closed registry."); + } catch (IllegalStateException ignore) { + } + + // All state should still get discarded + stateHandleY.discardState(); + verify(stateHandleY.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + // This should still be unaffected + verify(stateHandleX.getMetaStateHandle(), never()).discardState(); + for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) { + verify(stateHandle, never()).discardState(); + } + + // We re-register the handle with a new registry + SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry()); + stateHandleX.registerSharedStates(sharedStateRegistryB); + stateHandleX.discardState(); + + // Should be completely discarded because it is tracked through the new registry + verify(stateHandleX.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + sharedStateRegistryB.close(); + } + private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( UUID.nameUUIDFromBytes("test".getBytes()), http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java index a0c4412..037ecd1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java @@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2); + private final int maxRetainedCheckpoints; + + public RecoverableCompletedCheckpointStore() { + this(1); + } + + public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) { + Preconditions.checkArgument(maxRetainedCheckpoints > 0); + this.maxRetainedCheckpoints = maxRetainedCheckpoints; + } + @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { checkpoints.addAll(suspended); suspended.clear(); - - for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); - } } @Override @@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS checkpoints.addLast(checkpoint); - - if (checkpoints.size() > 1) { - CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); + if (checkpoints.size() > maxRetainedCheckpoints) { + removeOldestCheckpoint(); } } + public void removeOldestCheckpoint() throws Exception { + CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); + checkpointToSubsume.discardOnSubsume(); + } + @Override public CompletedCheckpoint getLatestCheckpoint() throws Exception { return checkpoints.isEmpty() ? null : checkpoints.getLast(); @@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS @Override public int getMaxNumberOfRetainedCheckpoints() { - return 1; + return maxRetainedCheckpoints; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index cb8639b..1ba5fb1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; @@ -779,9 +780,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) { + + TaskInfo taskInfo = getEnvironment().getTaskInfo(); return operator.getClass().getSimpleName() + - "_" + vertexId + - "_" + getEnvironment().getTaskInfo().getIndexOfThisSubtask(); + "_" + operator.getOperatorID() + + "_(" + taskInfo.getIndexOfThisSubtask() + "/" + taskInfo.getNumberOfParallelSubtasks() + ")"; } /** @@ -892,18 +895,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; + // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state // to stateless tasks on restore. This enables simple job modifications that only concern // stateless without the need to assign them uids to match their (always empty) states. owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, - hasState ? taskOperatorSubtaskStates : null); + acknowledgedState); + + LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); + + LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", + owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState); - if (LOG.isDebugEnabled()) { - LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); - } } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", owner.getName(), http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 22ed847..c525a37 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; @@ -48,21 +49,22 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; +import org.apache.curator.test.TestingServer; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; +import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -87,6 +89,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog private static TestStreamEnvironment env; + private static TestingServer zkServer; + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -101,11 +105,27 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } enum StateBackendEnum { - MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC + MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC } - @BeforeClass - public static void startTestCluster() { + @Before + public void startTestCluster() throws Exception { + + // print a message when starting a test method to avoid Travis' <tt>"Maven produced no + // output for xxx seconds."</tt> messages + System.out.println( + "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); + + // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints + if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) { + zkServer = new TestingServer(); + zkServer.start(); + } + + TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final File haDir = temporaryFolder.newFolder(); + Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); @@ -113,28 +133,18 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB + if (zkServer != null) { + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); + } + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); env = new TestStreamEnvironment(cluster, PARALLELISM); env.getConfig().setUseSnapshotCompression(true); - } - - @AfterClass - public static void stopTestCluster() { - if (cluster != null) { - cluster.stop(); - } - } - - @Before - public void beforeTest() throws IOException { - // print a message when starting a test method to avoid Travis' <tt>"Maven produced no - // output for xxx seconds."</tt> messages - System.out.println( - "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); - // init state back-end switch (stateBackendEnum) { case MEM: this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); @@ -159,7 +169,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog this.stateBackend = rdb; break; } - case ROCKSDB_INCREMENTAL: { + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); String backups = tempFolder.newFolder().getAbsolutePath(); // we use the fs backend with small threshold here to test the behaviour with file @@ -173,16 +184,25 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog this.stateBackend = rdb; break; } - + default: + throw new IllegalStateException("No backend selected."); } } - /** - * Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output - * for xxx seconds."</tt> messages. - */ @After - public void afterTest() { + public void stopTestCluster() throws IOException { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + + if (zkServer != null) { + zkServer.stop(); + zkServer = null; + } + + //Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output + // for xxx seconds."</tt> messages. System.out.println( "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); }