Repository: flink Updated Branches: refs/heads/master 94d3166b4 -> 8cff17fcc
[FLINK-6964] [checkpoint] Fix externalized incremental checkpoints for StandaloneCompletedCheckpointStore Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cff17fc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cff17fc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cff17fc Branch: refs/heads/master Commit: 8cff17fcc9b4bca6499c26fc2a6318c759cbf251 Parents: 94d3166 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed Jun 21 11:22:28 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jul 14 15:52:09 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 12 +- .../checkpoint/CheckpointCoordinator.java | 34 ++- .../savepoint/SavepointV2Serializer.java | 7 +- .../state/IncrementalKeyedStateHandle.java | 21 +- .../runtime/state/SharedStateRegistry.java | 45 +++- .../ZooKeeperCompletedCheckpointStoreTest.java | 19 +- .../savepoint/CheckpointTestUtils.java | 2 +- .../state/IncrementalKeyedStateHandleTest.java | 3 +- .../runtime/testingUtils/TestingCluster.scala | 49 +++- .../testingUtils/TestingJobManagerLike.scala | 59 ++++- .../TestingJobManagerMessages.scala | 39 +++- .../ExternalizedCheckpointITCase.java | 228 +++++++++++++++++++ 12 files changed, 482 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 291973c..9d289b4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -185,6 +185,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** The identifier of the last completed checkpoint. */ private long lastCompletedCheckpointId = -1; + /** Unique ID of this backend. */ + private UUID backendUID; + private static final String SST_FILE_SUFFIX = ".sst"; public RocksDBKeyedStateBackend( @@ -233,6 +236,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.kvStateInformation = new HashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); + this.backendUID = UUID.randomUUID(); + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } /** @@ -926,7 +931,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } return new IncrementalKeyedStateHandle( - stateBackend.operatorIdentifier, + stateBackend.backendUID, stateBackend.keyGroupRange, checkpointId, sstFiles, @@ -1438,6 +1443,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } else { + // pick up again the old backend id, so the we can reference existing state + stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", + stateBackend.operatorIdentifier, stateBackend.backendUID); // create hard links in the instance directory if (!stateBackend.instanceRocksDBPath.mkdirs()) { http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 905d132..3e36158 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; @@ -417,6 +418,36 @@ public class CheckpointCoordinator { return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess(); } + /** + * Test method to trigger a checkpoint/savepoint. + * + * @param timestamp The timestamp for the checkpoint. + * @param options The checkpoint options. + * @return A future to the completed checkpoint + */ + @VisibleForTesting + @Internal + public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception { + switch (options.getCheckpointType()) { + case SAVEPOINT: + return triggerSavepoint(timestamp, options.getTargetLocation()); + + case FULL_CHECKPOINT: + CheckpointTriggerResult triggerResult = + triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false); + + if (triggerResult.isSuccess()) { + return triggerResult.getPendingCheckpoint().getCompletionFuture(); + } else { + Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message()); + return FlinkCompletableFuture.completedExceptionally(cause); + } + + default: + throw new IllegalArgumentException("Unknown checkpoint type: " + options.getCheckpointType()); + } + } + @VisibleForTesting CheckpointTriggerResult triggerCheckpoint( long timestamp, @@ -1092,6 +1123,7 @@ public class CheckpointCoordinator { CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( job, tasks, savepointPath, userClassLoader, allowNonRestored); + savepoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpointStore.addCheckpoint(savepoint); // Reset the checkpoint ID counter @@ -1099,7 +1131,7 @@ public class CheckpointCoordinator { checkpointIdCounter.setCount(nextCheckpointId); LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId); - + return restoreLatestCheckpointedState(tasks, true, allowNonRestored); } http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index da0022c..c8d695f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * (De)serializer for checkpoint metadata format version 2. @@ -320,7 +321,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE); dos.writeLong(incrementalKeyedStateHandle.getCheckpointId()); - dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier()); + dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier())); dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup()); dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); @@ -380,7 +381,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) { long checkpointId = dis.readLong(); - String operatorId = dis.readUTF(); + String backendId = dis.readUTF(); int startKeyGroup = dis.readInt(); int numKeyGroups = dis.readInt(); KeyGroupRange keyGroupRange = @@ -391,7 +392,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); return new IncrementalKeyedStateHandle( - operatorId, + UUID.fromString(backendId), keyGroupRange, checkpointId, sharedStates, http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 770b5a9..0085890 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -20,10 +20,12 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.UUID; /** * The handle to states of an incremental snapshot. @@ -57,9 +59,10 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private static final long serialVersionUID = -8328808513197388231L; /** - * The operator instance identifier for this handle + * UUID to identify the backend which created this state handle. This is in creating the key for the + * {@link SharedStateRegistry}. */ - private final String operatorIdentifier; + private final UUID backendIdentifier; /** * The key-group range covered by this state handle @@ -97,14 +100,14 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private transient SharedStateRegistry sharedStateRegistry; public IncrementalKeyedStateHandle( - String operatorIdentifier, + UUID backendIdentifier, KeyGroupRange keyGroupRange, long checkpointId, Map<StateHandleID, StreamStateHandle> sharedState, Map<StateHandleID, StreamStateHandle> privateState, StreamStateHandle metaStateHandle) { - this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); + this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier); this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.checkpointId = checkpointId; this.sharedState = Preconditions.checkNotNull(sharedState); @@ -134,8 +137,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { return metaStateHandle; } - public String getOperatorIdentifier() { - return operatorIdentifier; + public UUID getBackendIdentifier() { + return backendIdentifier; } @Override @@ -231,7 +234,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { */ @VisibleForTesting public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { - return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId); + return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); } /** @@ -252,7 +255,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { if (getCheckpointId() != that.getCheckpointId()) { return false; } - if (!getOperatorIdentifier().equals(that.getOperatorIdentifier())) { + if (!getBackendIdentifier().equals(that.getBackendIdentifier())) { return false; } if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { @@ -273,7 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @VisibleForTesting @Override public int hashCode() { - int result = getOperatorIdentifier().hashCode(); + int result = getBackendIdentifier().hashCode(); result = 31 * result + getKeyGroupRange().hashCode(); result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); result = 31 * result + getSharedState().hashCode(); http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index af9ac9d..e0ca873 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class SharedStateRegistry { /** Executor for async state deletion */ private final Executor asyncDisposalExecutor; + /** Default uses direct executor to delete unreferenced state */ public SharedStateRegistry() { this(Executors.directExecutor()); } @@ -83,11 +85,16 @@ public class SharedStateRegistry { entry = registeredStates.get(registrationKey); if (entry == null) { + + // Additional check that should never fail, because only state handles that are not placeholders should + // ever be inserted to the registry. + Preconditions.checkState(!isPlaceholder(state), "Attempt to reference unknown state: " + registrationKey); + entry = new SharedStateRegistry.SharedStateEntry(state); registeredStates.put(registrationKey, entry); } else { // delete if this is a real duplicate - if (!Objects.equals(state, entry.state)) { + if (!Objects.equals(state, entry.stateHandle)) { scheduledStateDeletion = state; } entry.increaseReferenceCount(); @@ -95,6 +102,7 @@ public class SharedStateRegistry { } scheduleAsyncDelete(scheduledStateDeletion); + LOG.trace("Registered shared state {} under key {}.", entry, registrationKey); return new Result(entry); } @@ -112,9 +120,10 @@ public class SharedStateRegistry { final Result result; final StreamStateHandle scheduledStateDeletion; + SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { - SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey); + entry = registeredStates.get(registrationKey); Preconditions.checkState(entry != null, "Cannot unregister a state that is not registered."); @@ -124,7 +133,7 @@ public class SharedStateRegistry { // Remove the state from the registry when it's not referenced any more. if (entry.getReferenceCount() <= 0) { registeredStates.remove(registrationKey); - scheduledStateDeletion = entry.getState(); + scheduledStateDeletion = entry.getStateHandle(); result = new Result(null, 0); } else { scheduledStateDeletion = null; @@ -132,6 +141,7 @@ public class SharedStateRegistry { } } + LOG.trace("Unregistered shared state {} under key {}.", entry, registrationKey); scheduleAsyncDelete(scheduledStateDeletion); return result; } @@ -142,6 +152,7 @@ public class SharedStateRegistry { * @param stateHandles The shared states to register. */ public void registerAll(Iterable<? extends CompositeStateHandle> stateHandles) { + if (stateHandles == null) { return; } @@ -156,6 +167,8 @@ public class SharedStateRegistry { private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { + + LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); asyncDisposalExecutor.execute( new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); } @@ -171,18 +184,18 @@ public class SharedStateRegistry { private static class SharedStateEntry { /** The shared state handle */ - private final StreamStateHandle state; + private final StreamStateHandle stateHandle; /** The current reference count of the state handle */ private int referenceCount; SharedStateEntry(StreamStateHandle value) { - this.state = value; + this.stateHandle = value; this.referenceCount = 1; } - StreamStateHandle getState() { - return state; + StreamStateHandle getStateHandle() { + return stateHandle; } int getReferenceCount() { @@ -196,6 +209,14 @@ public class SharedStateRegistry { void decreaseReferenceCount() { --referenceCount; } + + @Override + public String toString() { + return "SharedStateEntry{" + + "stateHandle=" + stateHandle + + ", referenceCount=" + referenceCount + + '}'; + } } /** @@ -210,7 +231,7 @@ public class SharedStateRegistry { private final int referenceCount; private Result(SharedStateEntry sharedStateEntry) { - this.reference = sharedStateEntry.getState(); + this.reference = sharedStateEntry.getStateHandle(); this.referenceCount = sharedStateEntry.getReferenceCount(); } @@ -228,6 +249,14 @@ public class SharedStateRegistry { public int getReferenceCount() { return referenceCount; } + + @Override + public String toString() { + return "Result{" + + "reference=" + reference + + ", referenceCount=" + referenceCount + + '}'; + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index b5854dd..91bab85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -18,12 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.ErrorListenerPathable; -import org.apache.curator.utils.EnsurePath; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -31,6 +25,13 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.ErrorListenerPathable; +import org.apache.curator.utils.EnsurePath; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -160,9 +162,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { stateStorage, Executors.directExecutor()); - SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); + verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); + CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); // check that we return the latest retrievable checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index f985573..de1f599 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -274,7 +274,7 @@ public class CheckpointTestUtils { public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { return new IncrementalKeyedStateHandle( - createRandomUUID(rnd).toString(), + createRandomUUID(rnd), new KeyGroupRange(1, 1), 42L, createRandomStateHandleMap(rnd), http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java index 2a6975a..c1b3ccd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import java.util.Map; import java.util.Random; +import java.util.UUID; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -187,7 +188,7 @@ public class IncrementalKeyedStateHandleTest { private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( - "test", + UUID.nameUUIDFromBytes("test".getBytes()), KeyGroupRange.of(0, 0), 1L, placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 9e0a6e1..0e3eae5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -26,10 +26,10 @@ import akka.pattern.Patterns._ import akka.pattern.ask import akka.testkit.CallingThreadDispatcher import org.apache.flink.api.common.JobID -import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions} +import org.apache.flink.configuration.{Configuration, JobManagerOptions} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.Savepoint +import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory} import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -39,11 +39,12 @@ import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager} import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{CheckpointRequest, CheckpointRequestFailure, CheckpointRequestSuccess, ResponseSavepoint} import org.apache.flink.runtime.testingUtils.TestingMessages.Alive import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.testutils.TestingResourceManager @@ -372,6 +373,48 @@ class TestingCluster( case _ => throw new IOException("Dispose savepoint failed") } } + + @throws(classOf[IOException]) + def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = { + val jobManagerGateway = getLeaderGateway(timeout) + + // wait until the cluster is ready to take a checkpoint. + val allRunning = jobManagerGateway.ask( + TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout) + + Await.ready(allRunning, timeout) + + // trigger checkpoint + val result = Await.result( + jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout) + + result match { + case success: CheckpointRequestSuccess => success.path + case fail: CheckpointRequestFailure => { + if (fail.cause.getMessage.contains("tasks not ready")) { + // retry if the tasks are not ready yet. + Thread.sleep(50) + requestCheckpoint(jobId, options) + } else { + throw new IOException(fail.cause) + } + } + case _ => throw new IllegalStateException("Trigger checkpoint failed") + } + } + + @throws[Exception] + def cancelJob(jobId: JobID): Unit = { + if (getCurrentlyRunningJobsJava.contains(jobId)) { + val jobManagerGateway = getLeaderGateway(timeout) + val cancelFuture = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout) + val result = Await.result(cancelFuture, timeout) + if (!result.isInstanceOf[JobManagerMessages.CancellationSuccess]) { + throw new Exception("Cancellation failed") + } + } + else throw new IllegalStateException("Job is not running") + } } object TestingCluster { http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index a3d31f5..3d3af95 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -22,14 +22,17 @@ import akka.actor.{ActorRef, Cancellable, Terminated} import akka.pattern.{ask, pipe} import org.apache.flink.api.common.JobID import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.concurrent.BiFunction import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway import org.apache.flink.runtime.messages.Acknowledge import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps} +import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat @@ -336,6 +339,60 @@ trait TestingJobManagerLike extends FlinkActor { } } + case CheckpointRequest(jobId, checkpointOptions) => + currentJobs.get(jobId) match { + case Some((graph, _)) => + val checkpointCoordinator = graph.getCheckpointCoordinator() + + if (checkpointCoordinator != null) { + // Immutable copy for the future + val senderRef = sender() + try { + // Do this async, because checkpoint coordinator operations can + // contain blocking calls to the state backend or ZooKeeper. + val cpFuture = checkpointCoordinator.triggerCheckpoint( + System.currentTimeMillis(), + checkpointOptions) + + cpFuture.handleAsync[Void]( + new BiFunction[CompletedCheckpoint, Throwable, Void] { + override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { + if (success != null) { + if (success.getExternalPointer == null && + CheckpointType.SAVEPOINT.equals(checkpointOptions.getCheckpointType)) { + senderRef ! CheckpointRequestFailure( + jobId, new Exception("Savepoint has not been persisted.") + ) + } else { + senderRef ! CheckpointRequestSuccess( + jobId, + success.getCheckpointID, + success.getExternalPointer, + success.getTimestamp) + } + } else { + senderRef ! CheckpointRequestFailure( + jobId, new Exception("Failed to complete checkpoint", cause)) + } + null + } + }, + context.dispatcher) + } catch { + case e: Exception => + senderRef ! CheckpointRequestFailure(jobId, new Exception( + "Failed to trigger checkpoint", e)) + } + } else { + sender() ! CheckpointRequestFailure(jobId, new IllegalStateException( + "Checkpointing disabled. You can enable it via the execution environment of " + + "your job.")) + } + + case None => + sender() ! CheckpointRequestFailure(jobId, new IllegalArgumentException("Unknown job.")) + } + case NotifyWhenLeader => if (leaderElectionService.hasLeadership) { sender() ! true http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index 48c2bfb..f79c124 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -23,10 +23,13 @@ import java.util.Map import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.api.common.accumulators.Accumulator +import org.apache.flink.runtime.checkpoint.CheckpointOptions import org.apache.flink.runtime.checkpoint.savepoint.Savepoint -import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph} +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph import org.apache.flink.runtime.instance.ActorGateway import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.messages.RequiresLeaderSessionID +import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage object TestingJobManagerMessages { @@ -59,6 +62,40 @@ object TestingJobManagerMessages { case class TaskManagerTerminated(taskManager: ActorRef) /** + * Triggers a checkpoint for the specified job. + * + * This is not a subtype of [[AbstractCheckpointMessage]], because it is a + * control-flow message, which is *not* part of the checkpointing mechanism + * of triggering and acknowledging checkpoints. + * + * @param jobId The JobID of the job to trigger the savepoint for. + * @param options properties of the checkpoint + */ + case class CheckpointRequest( + jobId: JobID, + options: CheckpointOptions) extends RequiresLeaderSessionID + + /** + * Response after a successful checkpoint trigger containing the savepoint path. + * + * @param jobId The job ID for which the savepoint was triggered. + * @param path The path of the savepoint. + */ + case class CheckpointRequestSuccess( + jobId: JobID, + checkpointId: Long, + path: String, + triggerTime: Long) + + /** + * Response after a failed checkpoint trigger containing the failure cause. + * + * @param jobId The job ID for which the savepoint was triggered. + * @param cause The cause of the failure. + */ + case class CheckpointRequestFailure(jobId: JobID, cause: Throwable) + + /** * Registers a listener to receive a message when accumulators changed. * The change must be explicitly triggered by the TestingTaskManager which can receive an * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]] http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java new file mode 100644 index 0000000..e341741 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.test.state.ManualWindowSpeedITCase; + +import org.apache.curator.test.TestingServer; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +/** + * IT case for externalized checkpoints with {@link org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore} + * and {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + * + * <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964. + */ +public class ExternalizedCheckpointITCase { + + private static final int PARALLELISM = 2; + private static final int NUM_TASK_MANAGERS = 2; + private static final int SLOTS_PER_TASK_MANAGER = 2; + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + new RocksDBStateBackend(checkpointDir.toURI().toString(), true)); + } + + @Test + public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + new RocksDBStateBackend(checkpointDir.toURI().toString(), false)); + } + + @Test + public void testExternalizedFSCheckpointsStandalone() throws Exception { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + null, + new FsStateBackend(checkpointDir.toURI().toString(), true)); + + } + + @Test + public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + new RocksDBStateBackend(checkpointDir.toURI().toString(), true)); + } finally { + zkServer.stop(); + } + } + + @Test + public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + new RocksDBStateBackend(checkpointDir.toURI().toString(), false)); + } finally { + zkServer.stop(); + } + } + + @Test + public void testExternalizedFSCheckpointsZookeeper() throws Exception { + TestingServer zkServer = new TestingServer(); + zkServer.start(); + try { + final File checkpointDir = temporaryFolder.newFolder(); + testExternalizedCheckpoints( + checkpointDir, + zkServer.getConnectString(), + new FsStateBackend(checkpointDir.toURI().toString(), true)); + } finally { + zkServer.stop(); + } + } + + private void testExternalizedCheckpoints( + File checkpointDir, + String zooKeeperQuorum, + AbstractStateBackend backend) throws Exception { + + final Configuration config = new Configuration(); + + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER); + + final File savepointDir = temporaryFolder.newFolder(); + + config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); + config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); + config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + + // ZooKeeper recovery mode? + if (zooKeeperQuorum != null) { + final File haDir = temporaryFolder.newFolder(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); + } + + TestingCluster cluster = new TestingCluster(config); + cluster.start(); + + String externalCheckpoint = null; + + try { + + // main test sequence: start job -> eCP -> restore job -> eCP -> restore job -> eCP + for (int i = 0; i < 3; ++i) { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setStateBackend(backend); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setParallelism(PARALLELISM); + + env.addSource(new ManualWindowSpeedITCase.InfiniteTupleSource(10_000)) + .keyBy(0) + .timeWindow(Time.seconds(3)) + .reduce(new ReduceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> reduce( + Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return Tuple2.of(value1.f0, value1.f1 + value2.f1); + } + }) + .filter(new FilterFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple2<String, Integer> value) throws Exception { + return value.f0.startsWith("Tuple 0"); + } + }); + + StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setJobName("Test"); + + JobGraph jobGraph = streamGraph.getJobGraph(); + + // recover from previous iteration? + if (externalCheckpoint != null) { + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint)); + } + + config.addAll(jobGraph.getJobConfiguration()); + JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph); + + // let the job do some progress + Thread.sleep(200); + + externalCheckpoint = + cluster.requestCheckpoint(submissionResult.getJobID(), CheckpointOptions.forFullCheckpoint()); + + cluster.cancelJob(submissionResult.getJobID()); + } + } finally { + cluster.stop(); + cluster.awaitTermination(); + } + } +}