[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff Branch: refs/heads/release-1.3 Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c Parents: 0225db2 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue Jul 25 12:04:16 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jul 28 15:42:28 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 13 +- .../checkpoint/CheckpointCoordinator.java | 32 +- .../runtime/checkpoint/CompletedCheckpoint.java | 3 +- .../checkpoint/CompletedCheckpointStore.java | 5 +- .../StandaloneCompletedCheckpointStore.java | 4 +- .../ZooKeeperCompletedCheckpointStore.java | 12 +- .../runtime/executiongraph/ExecutionGraph.java | 6 +- .../state/IncrementalKeyedStateHandle.java | 68 ++- .../runtime/state/KeyGroupsStateHandle.java | 2 +- .../runtime/state/MultiStreamStateHandle.java | 10 +- .../runtime/state/SharedStateRegistry.java | 52 ++- .../state/SharedStateRegistryFactory.java | 35 ++ .../state/memory/ByteStreamStateHandle.java | 1 + ...tCoordinatorExternalizedCheckpointsTest.java | 22 +- .../CheckpointCoordinatorFailureTest.java | 7 +- .../CheckpointCoordinatorMasterHooksTest.java | 7 +- .../checkpoint/CheckpointCoordinatorTest.java | 437 ++++++++++--------- .../checkpoint/CheckpointStateRestoreTest.java | 10 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 25 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 7 +- .../state/IncrementalKeyedStateHandleTest.java | 75 +++- .../RecoverableCompletedCheckpointStore.java | 33 +- .../streaming/runtime/tasks/StreamTask.java | 1 - ...tractEventTimeWindowCheckpointingITCase.java | 85 +++- ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- ...ckendEventTimeWindowCheckpointingITCase.java | 51 +++ ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- ...ckendEventTimeWindowCheckpointingITCase.java | 4 +- 31 files changed, 688 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index a6b53ec..7e0910e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); - LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); + + LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.", + this.operatorIdentifier, + this.backendUID); } /** @@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { void takeSnapshot() throws Exception { assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); + final long lastCompletedCheckpoint; + // use the last completed checkpoint as the comparison base. synchronized (stateBackend.materializedSstFiles) { - baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; + baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); } + LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + + "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 82933ac..fe94d25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.util.Preconditions; @@ -174,8 +175,11 @@ public class CheckpointCoordinator { @Nullable private CheckpointStatsTracker statsTracker; + /** A factory for SharedStateRegistry objects */ + private final SharedStateRegistryFactory sharedStateRegistryFactory; + /** Registry that tracks state which is shared across (incremental) checkpoints */ - private final SharedStateRegistry sharedStateRegistry; + private SharedStateRegistry sharedStateRegistry; // -------------------------------------------------------------------------------------------- @@ -192,7 +196,8 @@ public class CheckpointCoordinator { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, @Nullable String checkpointDirectory, - Executor executor) { + Executor executor, + SharedStateRegistryFactory sharedStateRegistryFactory) { // sanity checks checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); @@ -230,7 +235,8 @@ public class CheckpointCoordinator { this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.checkpointDirectory = checkpointDirectory; this.executor = checkNotNull(executor); - this.sharedStateRegistry = new SharedStateRegistry(executor); + this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); + this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -1044,10 +1050,23 @@ public class CheckpointCoordinator { throw new IllegalStateException("CheckpointCoordinator is shut down"); } - // Recover the checkpoints - completedCheckpointStore.recover(sharedStateRegistry); + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery + completedCheckpointStore.recover(); + + // Now, we re-register all (shared) states from the checkpoint store with the new registry + for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) { + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + } + + LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry); - // restore from the latest checkpoint + // Restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); if (latest == null) { @@ -1121,7 +1140,6 @@ public class CheckpointCoordinator { CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( job, tasks, savepointPath, userClassLoader, allowNonRestored); - savepoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpointStore.addCheckpoint(savepoint); // Reset the checkpoint ID counter http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 56aa19d..76d1580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable { private void doDiscard() throws Exception { + LOG.trace("Executing discard procedure for {}.", this); + try { // collect exceptions and continue cleanup Exception exception = null; @@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable { // discard private state objects try { Collection<OperatorState> values = operatorStates.values(); - LOG.trace("About to discard operator states {}.", values); StateUtil.bestEffortDiscardAllStateObjects(values); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 45d407e..82193b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.state.SharedStateRegistry; import java.util.List; @@ -33,10 +32,8 @@ public interface CompletedCheckpointStore { * * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * available checkpoint. - * - * @param sharedStateRegistry the shared state registry to register recovered states. */ - void recover(SharedStateRegistry sharedStateRegistry) throws Exception; + void recover() throws Exception; /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index fbb0198..63e7468 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.state.SharedStateRegistry; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { // Nothing to do } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index c4cb6bc..88dd0d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -18,20 +18,21 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * that the history of checkpoints is consistent. */ @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); // Clear local handles in order to prevent duplicates on // recovery. The local handles should reflect the state // of ZooKeeper. completedCheckpoints.clear(); - sharedStateRegistry.clear(); // Get all there is first List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; @@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto try { completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); if (completedCheckpoint != null) { - // Re-register all shared states in the checkpoint. - completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpoints.add(completedCheckpoint); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f9d2d69..c105d2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -66,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; @@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -459,7 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive checkpointIDCounter, checkpointStore, checkpointDir, - ioExecutor); + ioExecutor, + SharedStateRegistry.DEFAULT_FACTORY); // register the master hooks on the checkpoint coordinator for (MasterTriggerRestoreHook<?> hook : masterHooks) { http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 0085890..0268b10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private final UUID backendIdentifier; /** - * The key-group range covered by this state handle + * The key-group range covered by this state handle. */ private final KeyGroupRange keyGroupRange; /** - * The checkpoint Id + * The checkpoint Id. */ private final long checkpointId; /** - * Shared state in the incremental checkpoint. This i + * Shared state in the incremental checkpoint. */ private final Map<StateHandleID, StreamStateHandle> sharedState; /** - * Private state in the incremental checkpoint + * Private state in the incremental checkpoint. */ private final Map<StateHandleID, StreamStateHandle> privateState; /** - * Primary meta data state of the incremental checkpoint + * Primary meta data state of the incremental checkpoint. */ private final StreamStateHandle metaStateHandle; @@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { - return this; - } else { - return null; - } + return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ? + null : this; } @Override public void discardState() throws Exception { + SharedStateRegistry registry = this.sharedStateRegistry; + final boolean isRegistered = (registry != null); + + LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.", + isRegistered, + checkpointId, + backendIdentifier); + try { metaStateHandle.discardState(); } catch (Exception e) { @@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { // If this was not registered, we can delete the shared state. We can simply apply this // to all handles, because all handles that have not been created for the first time for this // are only placeholders at this point (disposing them is a NOP). - if (sharedStateRegistry == null) { - try { - StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); - } - } else { + if (isRegistered) { // If this was registered, we only unregister all our referenced shared states // from the registry. for (StateHandleID stateHandleID : sharedState.keySet()) { - sharedStateRegistry.unregisterReference( + registry.unregisterReference( createSharedStateRegistryKeyFromFileName(stateHandleID)); } + } else { + // Otherwise, we assume to own those handles and dispose them directly. + try { + StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } } } @@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { - Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states."); + // This is a quick check to avoid that we register twice with the same registry. However, the code allows to + // register again with a different registry. The implication is that ownership is transferred to this new + // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new + // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that + // an old registry object from a previous run is due to be GCed and will never be used for registration again. + Preconditions.checkState( + sharedStateRegistry != stateRegistry, + "The state handle has already registered its shared states to the given registry."); sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); + LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.", + checkpointId, + backendIdentifier); + for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); @@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { result = 31 * result + getMetaStateHandle().hashCode(); return result; } + + @Override + public String toString() { + return "IncrementalKeyedStateHandle{" + + "backendIdentifier=" + backendIdentifier + + ", keyGroupRange=" + keyGroupRange + + ", checkpointId=" + checkpointId + + ", sharedState=" + sharedState + + ", privateState=" + privateState + + ", metaStateHandle=" + metaStateHandle + + ", registered=" + (sharedStateRegistry != null) + + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 8e38ad4..8092f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle public String toString() { return "KeyGroupsStateHandle{" + "groupRangeOffsets=" + groupRangeOffsets + - ", data=" + stateHandle + + ", stateHandle=" + stateHandle + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java index b95dace..1960c1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java @@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle { private final List<StreamStateHandle> stateHandles; private final long stateSize; - public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException { + public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) { this.stateHandles = Preconditions.checkNotNull(stateHandles); long calculateSize = 0L; for(StreamStateHandle stateHandle : stateHandles) { @@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle { return stateSize; } + @Override + public String toString() { + return "MultiStreamStateHandle{" + + "stateHandles=" + stateHandles + + ", stateSize=" + stateSize + + '}'; + } + static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream { private final TreeMap<Long, StreamStateHandle> stateHandleMap; http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index e0ca873..347f30c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -38,13 +38,24 @@ import java.util.concurrent.Executor; * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies * them. */ -public class SharedStateRegistry { +public class SharedStateRegistry implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); + /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */ + public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() { + @Override + public SharedStateRegistry create(Executor deleteExecutor) { + return new SharedStateRegistry(deleteExecutor); + } + }; + /** All registered state objects by an artificial key */ private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + /** This flag indicates whether or not the registry is open or if close() was called */ + private boolean open; + /** Executor for async state deletion */ private final Executor asyncDisposalExecutor; @@ -56,6 +67,7 @@ public class SharedStateRegistry { public SharedStateRegistry(Executor asyncDisposalExecutor) { this.registeredStates = new HashMap<>(); this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor); + this.open = true; } /** @@ -82,6 +94,9 @@ public class SharedStateRegistry { SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { + + Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry."); + entry = registeredStates.get(registrationKey); if (entry == null) { @@ -96,6 +111,11 @@ public class SharedStateRegistry { // delete if this is a real duplicate if (!Objects.equals(state, entry.stateHandle)) { scheduledStateDeletion = state; + LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " + + "be an unnecessary copy of existing state {} and will be dropped.", + registrationKey, + state, + entry.stateHandle); } entry.increaseReferenceCount(); } @@ -112,7 +132,8 @@ public class SharedStateRegistry { * * @param registrationKey the shared state for which we release a reference. * @return the result of the request, consisting of the reference count after this operation - * and the state handle, or null if the state handle was deleted through this request. + * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry + * was previously closed. */ public Result unregisterReference(SharedStateRegistryKey registrationKey) { @@ -123,6 +144,7 @@ public class SharedStateRegistry { SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { + entry = registeredStates.get(registrationKey); Preconditions.checkState(entry != null, @@ -164,10 +186,18 @@ public class SharedStateRegistry { } } + @Override + public String toString() { + synchronized (registeredStates) { + return "SharedStateRegistry{" + + "registeredStates=" + registeredStates + + '}'; + } + } + private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { - LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); asyncDisposalExecutor.execute( new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); @@ -178,6 +208,13 @@ public class SharedStateRegistry { return stateHandle instanceof PlaceholderStreamStateHandle; } + @Override + public void close() { + synchronized (registeredStates) { + open = false; + } + } + /** * An entry in the registry, tracking the handle and the corresponding reference count. */ @@ -279,13 +316,4 @@ public class SharedStateRegistry { } } } - - /** - * Clears the registry. - */ - public void clear() { - synchronized (registeredStates) { - registeredStates.clear(); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java new file mode 100644 index 0000000..05c9825 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.util.concurrent.Executor; + +/** + * Simple factory to produce {@link SharedStateRegistry} objects. + */ +public interface SharedStateRegistryFactory { + + /** + * Factory method for {@link SharedStateRegistry}. + * + * @param deleteExecutor executor used to run (async) deletes. + * @return a SharedStateRegistry object + */ + SharedStateRegistry create(Executor deleteExecutor); +} http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 9ba9d35..3a43d4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle { public String toString() { return "ByteStreamStateHandle{" + "handleName='" + handleName + '\'' + + ", dataBytes=" + data.length + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java index d293eea..edc29fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; import org.apache.flink.runtime.concurrent.Executors; @@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + /** * CheckpointCoordinator tests for externalized checkpoints. * @@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), checkpointDir.getAbsolutePath(), - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 344b340..5cca94f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { new StandaloneCheckpointIDCounter(), new FailingCompletedCheckpointStore(), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.triggerCheckpoint(triggerTimestamp, false); @@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle); AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState); - + try { coord.receiveAcknowledgeMessage(acknowledgeMessage); fail("Expected a checkpoint exception because the completed checkpoint store could not " + @@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { throw new UnsupportedOperationException("Not implemented."); } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index d6daa4e..94063a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -47,14 +47,12 @@ import java.util.List; import java.util.concurrent.Executor; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.any; @@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); } private static <T> T mockGeneric(Class<?> clazz) { http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 186a819..16a89ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; @@ -56,6 +54,9 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger { Thread.sleep(250); } while (!checkpoint.isDiscarded() && - coord.getNumberOfPendingCheckpoints() > 0 && - System.currentTimeMillis() < deadline); + coord.getNumberOfPendingCheckpoints() > 0 && + System.currentTimeMillis() < deadline); assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger { int numCallsSoFar = numCalls.get(); Thread.sleep(400); assertTrue(numCallsSoFar == numCalls.get() || - numCallsSoFar+1 == numCalls.get()); + numCallsSoFar+1 == numCalls.get()); // start another sequence of periodic scheduling numCalls.set(0); @@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger { numCallsSoFar = numCalls.get(); Thread.sleep(400); assertTrue(numCallsSoFar == numCalls.get() || - numCallsSoFar + 1 == numCalls.get()); + numCallsSoFar + 1 == numCalls.get()); coord.shutdown(JobStatus.FINISHED); } @@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends TestLogger { final long delay = 50; final CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 2, // periodic interval is 2 ms - 200_000, // timeout is very long (200 s) - delay, // 50 ms delay between checkpoints - 1, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex }, - new ExecutionVertex[] { vertex }, - new ExecutionVertex[] { vertex }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2), - "dummy-path", - Executors.directExecutor()); + jid, + 2, // periodic interval is 2 ms + 200_000, // timeout is very long (200 s) + delay, // 50 ms delay between checkpoints + 1, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex }, + new ExecutionVertex[] { vertex }, + new ExecutionVertex[] { vertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2), + "dummy-path", + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); try { coord.startCheckpointScheduler(); @@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger { counter, new StandaloneCompletedCheckpointStore(10), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends TestLogger { Thread.sleep(20); } while ((now = System.currentTimeMillis()) < minDuration || - (numCalls.get() < maxConcurrentAttempts && now < timeout)); + (numCalls.get() < maxConcurrentAttempts && now < timeout)); assertEquals(maxConcurrentAttempts, numCalls.get()); verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts)) - .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); + .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); // now, once we acknowledge one checkpoint, it should trigger the next one coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L)); @@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger { Thread.sleep(20); } while ((now = System.currentTimeMillis()) < minDuration || - (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout)); + (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout)); // validate that the pending checkpoints are there assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints()); @@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger { Thread.sleep(20); } while (coord.getPendingCheckpoints().get(4L) == null && - System.currentTimeMillis() < newTimeout); + System.currentTimeMillis() < newTimeout); // do the final check assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints()); @@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends TestLogger { final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED); when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer( - new Answer<ExecutionState>() { - @Override - public ExecutionState answer(InvocationOnMock invocation){ - return currentState.get(); - } - }); + new Answer<ExecutionState>() { + @Override + public ExecutionState answer(InvocationOnMock invocation){ + return currentState.get(); + } + }); CheckpointCoordinator coord = new CheckpointCoordinator( jid, @@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.startCheckpointScheduler(); @@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger { Thread.sleep(20); } while (System.currentTimeMillis() < timeout && - coord.getNumberOfPendingCheckpoints() == 0); + coord.getNumberOfPendingCheckpoints() == 0); assertTrue(coord.getNumberOfPendingCheckpoints() > 0); } @@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger { checkpointIDCounter, new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); @@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger { allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); ExecutionVertex[] arrayExecutionVertices = - allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(); @@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), store, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends TestLogger { SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index)); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskState); + jid, + jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + subtaskState); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends TestLogger { SubtaskState subtaskState = mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index)); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskState); + jid, + jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + subtaskState); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends TestLogger { KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false); SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends TestLogger { KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false); SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger { allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); ExecutionVertex[] arrayExecutionVertices = - allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); List<KeyGroupRange> keyGroupPartitions1 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); List<KeyGroupRange> keyGroupPartitions2 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index); KeyGroupsStateHandle keyGroupState = generateKeyGroupState( - jobVertexID1, keyGroupPartitions1.get(index), false); + jobVertexID1, keyGroupPartitions1.get(index), false); SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends TestLogger { ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID2, index); KeyGroupsStateHandle keyGroupState = generateKeyGroupState( - jobVertexID2, keyGroupPartitions2.get(index), false); + jobVertexID2, keyGroupPartitions2.get(index), false); SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends TestLogger { int newParallelism2 = scaleOut ? 13 : 2; final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex( - jobVertexID1, - parallelism1, - maxParallelism1); + jobVertexID1, + parallelism1, + maxParallelism1); final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex( - jobVertexID2, - parallelism2, - maxParallelism2); + jobVertexID2, + parallelism2, + maxParallelism2); List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2); @@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger { allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); ExecutionVertex[] arrayExecutionVertices = - allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); List<KeyGroupRange> keyGroupPartitions1 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); List<KeyGroupRange> keyGroupPartitions2 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); //vertex 1 for (int index = 0; index < jobVertex1.getParallelism(); index++) { @@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends TestLogger { SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends TestLogger { expectedOpStatesBackend.add(opStateBackend); expectedOpStatesRaw.add(opStateRaw); SubtaskState checkpointStateHandles = - new SubtaskState(new ChainedStateHandle<>( - Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw); + new SubtaskState(new ChainedStateHandle<>( + Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - checkpointStateHandles); + jid, + jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } @@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends TestLogger { Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); List<KeyGroupRange> newKeyGroupPartitions2 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2); + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2); final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex( - jobVertexID1, - parallelism1, - maxParallelism1); + jobVertexID1, + parallelism1, + maxParallelism1); // rescale vertex 2 final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex( - jobVertexID2, - newParallelism2, - maxParallelism2); + jobVertexID2, + newParallelism2, + maxParallelism2); tasks.put(jobVertexID1, newJobVertex1); tasks.put(jobVertexID2, newJobVertex2); @@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); return new Tuple2<>(jobVertexID, operatorID); } - + /** * old topology * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2 @@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1); operatorStates.put(id.f1, taskState); for (int index = 0; index < taskState.getParallelism(); index++) { - StreamStateHandle subNonPartitionedState = + StreamStateHandle subNonPartitionedState = generateStateForVertex(id.f0, index) .get(0); OperatorStateHandle subManagedOperatorState = @@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends TestLogger { spy(new StandaloneCompletedCheckpointStore(1)); CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint( - jobID, - 2, - System.currentTimeMillis(), - System.currentTimeMillis() + 3000, - operatorStates, - Collections.<MasterState>emptyList(), - CheckpointProperties.forStandardCheckpoint(), - null, - null); + jobID, + 2, + System.currentTimeMillis(), + System.currentTimeMillis() + 3000, + operatorStates, + Collections.<MasterState>emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint); @@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), standaloneCompletedCheckpointStore, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); coord.restoreLatestCheckpointedState(tasks, false, true); @@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), "fake-directory", - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; List<Collection<OperatorStateHandle>> repartitionedStates = - repartitioner.repartitionState(Collections.singletonList(osh), 3); + repartitioner.repartitionState(Collections.singletonList(osh), 3); Map<String, Integer> checkCounts = new HashMap<>(3); for (Collection<OperatorStateHandle> operatorStateHandles : repartitionedStates) { for (OperatorStateHandle operatorStateHandle : operatorStateHandles) { for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo : - operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) { + operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) { String stateName = stateNameToMetaInfo.getKey(); Integer count = checkCounts.get(stateName); @@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger { // ------------------------------------------------------------------------ public static KeyGroupsStateHandle generateKeyGroupState( - JobVertexID jobVertexID, - KeyGroupRange keyGroupPartition, boolean rawState) throws IOException { + JobVertexID jobVertexID, + KeyGroupRange keyGroupPartition, boolean rawState) throws IOException { List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups()); @@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends TestLogger { } public static KeyGroupsStateHandle generateKeyGroupState( - KeyGroupRange keyGroupRange, - List<? extends Serializable> states) throws IOException { + KeyGroupRange keyGroupRange, + List<? extends Serializable> states) throws IOException { Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == states.size()); Tuple2<byte[], List<long[]>> serializedDataWithOffsets = - serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states)); + serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states)); KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0)); ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare( - String.valueOf(UUID.randomUUID()), - serializedDataWithOffsets.f0); + String.valueOf(UUID.randomUUID()), + serializedDataWithOffsets.f0); KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle( - keyGroupRangeOffsets, - allSerializedStatesHandle); + keyGroupRangeOffsets, + allSerializedStatesHandle); return keyGroupsStateHandle; } public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets( - List<List<? extends Serializable>> serializables) throws IOException { + List<List<? extends Serializable>> serializables) throws IOException { List<long[]> offsets = new ArrayList<>(serializables.size()); List<byte[]> serializedGroupValues = new ArrayList<>(); @@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends TestLogger { runningGroupsOffset = 0; for (byte[] serializedGroupValue : serializedGroupValues) { System.arraycopy( - serializedGroupValue, - 0, - allSerializedValuesConcatenated, - runningGroupsOffset, - serializedGroupValue.length); + serializedGroupValue, + 0, + allSerializedValuesConcatenated, + runningGroupsOffset, + serializedGroupValue.length); runningGroupsOffset += serializedGroupValue.length; } return new Tuple2<>(allSerializedValuesConcatenated, offsets); } public static ChainedStateHandle<StreamStateHandle> generateStateForVertex( - JobVertexID jobVertexID, - int index) throws IOException { + JobVertexID jobVertexID, + int index) throws IOException { Random random = new Random(jobVertexID.hashCode() + index); int value = random.nextInt(); @@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends TestLogger { } public static ChainedStateHandle<StreamStateHandle> generateChainedStateHandle( - Serializable value) throws IOException { + Serializable value) throws IOException { return ChainedStateHandle.wrapSingleHandle( - TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value)); + TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value)); } public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle( - JobVertexID jobVertexID, - int index, - int namedStates, - int partitionsPerState, - boolean rawState) throws IOException { + JobVertexID jobVertexID, + int index, + int namedStates, + int partitionsPerState, + boolean rawState) throws IOException { Map<String, List<? extends Serializable>> statesListsMap = new HashMap<>(namedStates); @@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle( - Map<String, List<? extends Serializable>> states) throws IOException { + Map<String, List<? extends Serializable>> states) throws IOException { List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size()); @@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends TestLogger { int idx = 0; for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) { offsetsMap.put( - entry.getKey(), - new OperatorStateHandle.StateMetaInfo( - serializationWithOffsets.f1.get(idx), - OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); + entry.getKey(), + new OperatorStateHandle.StateMetaInfo( + serializationWithOffsets.f1.get(idx), + OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); ++idx; } ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare( - String.valueOf(UUID.randomUUID()), - serializationWithOffsets.f0); + String.valueOf(UUID.randomUUID()), + serializationWithOffsets.f0); OperatorStateHandle operatorStateHandle = - new OperatorStateHandle(offsetsMap, streamStateHandle); + new OperatorStateHandle(offsetsMap, streamStateHandle); return ChainedStateHandle.wrapSingleHandle(operatorStateHandle); } static ExecutionJobVertex mockExecutionJobVertex( - JobVertexID jobVertexID, - int parallelism, - int maxParallelism) { + JobVertexID jobVertexID, + int parallelism, + int maxParallelism) { return mockExecutionJobVertex( jobVertexID, @@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs); - + when(vertex.getJobVertex()).thenReturn(jobVertex); return vertex; @@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger { } public static void verifyStateRestore( - JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, - List<KeyGroupRange> keyGroupPartitions) throws Exception { + JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, + List<KeyGroupRange> keyGroupPartitions) throws Exception { for (int i = 0; i < executionJobVertex.getParallelism(); i++) { @@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends TestLogger { ChainedStateHandle<StreamStateHandle> expectNonPartitionedState = generateStateForVertex(jobVertexID, i); ChainedStateHandle<StreamStateHandle> actualNonPartitionedState = taskStateHandles.getLegacyOperatorState(); assertTrue(CommonTestUtils.isSteamContentEqual( - expectNonPartitionedState.get(0).openInputStream(), - actualNonPartitionedState.get(0).openInputStream())); + expectNonPartitionedState.get(0).openInputStream(), + actualNonPartitionedState.get(0).openInputStream())); ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend = - generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false); + generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false); List<Collection<OperatorStateHandle>> actualPartitionableState = taskStateHandles.getManagedOperatorState(); assertTrue(CommonTestUtils.isSteamContentEqual( - expectedOpStateBackend.get(0).openInputStream(), - actualPartitionableState.get(0).iterator().next().openInputStream())); + expectedOpStateBackend.get(0).openInputStream(), + actualPartitionableState.get(0).iterator().next().openInputStream())); KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState( - jobVertexID, keyGroupPartitions.get(i), false); + jobVertexID, keyGroupPartitions.get(i), false); Collection<KeyedStateHandle> actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState(); compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState); } } public static void compareKeyedState( - Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState, - Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception { + Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState, + Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception { KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next(); int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups(); @@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger { long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId); inputStream.seek(offset); int expectedKeyGroupState = - InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader()); + InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader()); for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) { assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle); @@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger { try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) { actualInputStream.seek(actualOffset); int actualGroupState = InstantiationUtil. - deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader()); + deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader()); assertEquals(expectedKeyGroupState, actualGroupState); } } @@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger { } public static void comparePartitionableState( - List<ChainedStateHandle<OperatorStateHandle>> expected, - List<List<Collection<OperatorStateHandle>>> actual) throws Exception { + List<ChainedStateHandle<OperatorStateHandle>> expected, + List<List<Collection<OperatorStateHandle>>> actual) throws Exception { List<String> expectedResult = new ArrayList<>(); for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) { @@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger { for (long offset : entry.getValue().getOffsets()) { in.seek(offset); Integer state = InstantiationUtil. - deserializeObject(in, Thread.currentThread().getContextClassLoader()); + deserializeObject(in, Thread.currentThread().getContextClassLoader()); resultCollector.add(opIdx + " : " + entry.getKey() + " : " + state); } } @@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // Periodic CheckpointTriggerResult triggerResult = coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forStandardCheckpoint(), - null, - true); + System.currentTimeMillis(), + CheckpointProperties.forStandardCheckpoint(), + null, + true); assertTrue(triggerResult.isFailure()); assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason()); // Not periodic triggerResult = coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forStandardCheckpoint(), - null, - false); + System.currentTimeMillis(), + CheckpointProperties.forStandardCheckpoint(), + null, + false); assertFalse(triggerResult.isFailure()); } @@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends TestLogger { int maxPartitionsPerState = 1 + r.nextInt(9); doTestPartitionableStateRepartitioning( - r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState); + r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState); } } private void doTestPartitionableStateRepartitioning( - Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) { + Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) { List<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<>(oldParallelism); @@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends TestLogger { } OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? - OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE; + OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE; namedStatesToOffsets.put( - "State-" + s, - new OperatorStateHandle.StateMetaInfo(offs, mode)); + "State-" + s, + new OperatorStateHandle.StateMetaInfo(offs, mode)); } previousParallelOpInstanceStates.add( - new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1))); + new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1))); } Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>(); @@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger { long[] offs = e.getValue().getOffsets(); int replication = e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ? - newParallelism : 1; + newParallelism : 1; expectedTotalPartitions += replication * offs.length; List<Long> offsList = new ArrayList<>(offs.length); @@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; List<Collection<OperatorStateHandle>> pshs = - repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism); + repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism); Map<StreamStateHandle, Map<String, List<Long>>> actual = new HashMap<>(); @@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); coord.setCheckpointStatsTracker(tracker); @@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger { new StandaloneCheckpointIDCounter(), store, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); store.addCheckpoint(new CompletedCheckpoint( new JobID(), @@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger { checkpointIDCounter, completedCheckpointStore, null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // trigger a first checkpoint assertTrue( http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 7d24568..0888cff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.util.SerializableObject; @@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); try { coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false); @@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, - Executors.directExecutor()); + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY); StreamStateHandle serializedState = CheckpointCoordinatorTest .generateChainedStateHandle(new SerializableObject())